diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index f6ee645e1..1640902db 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/beacon" "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" @@ -349,11 +348,3 @@ func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *beacon.Pa } return beacon.BlockToExecutableData(block), nil } - -// Used in tests to add a the list of transactions from a block to the tx pool. -func (api *ConsensusAPI) insertTransactions(txs types.Transactions) error { - for _, tx := range txs { - api.eth.TxPool().AddLocal(tx) - } - return nil -} diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index de2e58a4f..bbaa8ae16 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -108,7 +108,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) { api := NewConsensusAPI(ethservice) // Put the 10th block's tx in the pool and produce a new block - api.insertTransactions(blocks[9].Transactions()) + api.eth.TxPool().AddRemotesSync(blocks[9].Transactions()) blockParams := beacon.PayloadAttributesV1{ Timestamp: blocks[8].Time() + 5, } diff --git a/miner/worker.go b/miner/worker.go index c6927a1ca..31022e7e1 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -77,6 +77,11 @@ const ( staleThreshold = 7 ) +var ( + errBlockInterruptedByNewHead = errors.New("new head arrived while building block") + errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block") +) + // environment is the worker's current environment and holds all // information of the sealing block generation. type environment struct { @@ -841,7 +846,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]* return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool { +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -866,8 +871,9 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP ratio: ratio, inc: true, } + return errBlockInterruptedByRecommit } - return atomic.LoadInt32(interrupt) == commitInterruptNewHead + return errBlockInterruptedByNewHead } // If we don't have enough gas for any further transactions then we're done if env.gasPool.Gas() < params.TxGas { @@ -951,7 +957,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP if interrupt != nil { w.resubmitAdjustCh <- &intervalAdjust{inc: false} } - return false + return nil } // generateParams wraps various of settings for generating sealing task. @@ -1050,7 +1056,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interrupt *int32, env *environment) { +func (w *worker) fillTransactions(interrupt *int32, env *environment) error { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(true) @@ -1063,16 +1069,17 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { } if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { - return + if err := w.commitTransactions(env, txs, interrupt); err != nil { + return err } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { - return + if err := w.commitTransactions(env, txs, interrupt); err != nil { + return err } } + return nil } // generateWork generates a sealing block based on the given parameters. @@ -1084,6 +1091,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { defer work.discard() w.fillTransactions(nil, work) + return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) } @@ -1113,8 +1121,14 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { if !noempty && atomic.LoadUint32(&w.noempty) == 0 { w.commit(work.copy(), nil, false, start) } + // Fill pending transactions from the txpool - w.fillTransactions(interrupt, work) + err = w.fillTransactions(interrupt, work) + if errors.Is(err, errBlockInterruptedByNewHead) { + work.discard() + return + } + w.commit(work.copy(), w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover