Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
8 changed files with 597 additions and 425 deletions
Showing only changes of commit 78636ee568 - Show all commits

View File

@ -234,7 +234,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}
eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock, merger)
eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}

View File

@ -23,20 +23,14 @@ import (
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/beacon"
"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
chainParams "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie"
)
@ -83,97 +77,28 @@ type ConsensusAPI struct {
light bool
eth *eth.Ethereum
les *les.LightEthereum
engine consensus.Engine // engine is the post-merge consensus engine, only for block creation
preparedBlocks *payloadQueue // preparedBlocks caches payloads (*ExecutableDataV1) by payload ID (PayloadID)
}
func NewConsensusAPI(eth *eth.Ethereum, les *les.LightEthereum) *ConsensusAPI {
var engine consensus.Engine
if eth == nil {
if les.BlockChain().Config().TerminalTotalDifficulty == nil {
panic("Catalyst started without valid total difficulty")
}
if b, ok := les.Engine().(*beacon.Beacon); ok {
engine = beacon.New(b.InnerEngine())
} else {
engine = beacon.New(les.Engine())
}
} else {
if eth.BlockChain().Config().TerminalTotalDifficulty == nil {
panic("Catalyst started without valid total difficulty")
}
if b, ok := eth.Engine().(*beacon.Beacon); ok {
engine = beacon.New(b.InnerEngine())
} else {
engine = beacon.New(eth.Engine())
}
}
return &ConsensusAPI{
light: eth == nil,
eth: eth,
les: les,
engine: engine,
preparedBlocks: newPayloadQueue(),
}
}
// blockExecutionEnv gathers all the data required to execute
// a block, either when assembling it or when inserting it.
type blockExecutionEnv struct {
chain *core.BlockChain
state *state.StateDB
tcount int
gasPool *core.GasPool
header *types.Header
txs []*types.Transaction
receipts []*types.Receipt
}
func (env *blockExecutionEnv) commitTransaction(tx *types.Transaction, coinbase common.Address) error {
vmConfig := *env.chain.GetVMConfig()
snap := env.state.Snapshot()
receipt, err := core.ApplyTransaction(env.chain.Config(), env.chain, &coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, vmConfig)
if err != nil {
env.state.RevertToSnapshot(snap)
return err
}
env.txs = append(env.txs, tx)
env.receipts = append(env.receipts, receipt)
return nil
}
func (api *ConsensusAPI) makeEnv(parent *types.Block, header *types.Header) (*blockExecutionEnv, error) {
// The parent state might be missing. It can be the special scenario
// that consensus layer tries to build a new block based on the very
// old side chain block and the relevant state is already pruned. So
// try to retrieve the live state from the chain, if it's not existent,
// do the necessary recovery work.
var (
err error
state *state.StateDB
)
if api.eth.BlockChain().HasState(parent.Root()) {
state, err = api.eth.BlockChain().StateAt(parent.Root())
} else {
// The maximum acceptable reorg depth can be limited by the
// finalised block somehow. TODO(rjl493456442) fix the hard-
// coded number here later.
state, err = api.eth.StateAtBlock(parent, 1000, nil, false, false)
}
if err != nil {
return nil, err
}
env := &blockExecutionEnv{
chain: api.eth.BlockChain(),
state: state,
header: header,
gasPool: new(core.GasPool).AddGas(header.GasLimit),
}
return env, nil
}
func (api *ConsensusAPI) GetPayloadV1(payloadID PayloadID) (*ExecutableDataV1, error) {
log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID)
data := api.preparedBlocks.get(payloadID)
@ -183,36 +108,51 @@ func (api *ConsensusAPI) GetPayloadV1(payloadID PayloadID) (*ExecutableDataV1, e
return data, nil
}
func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads ForkchoiceStateV1, PayloadAttributes *PayloadAttributesV1) (ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads ForkchoiceStateV1, payloadAttributes *PayloadAttributesV1) (ForkChoiceResponse, error) {
log.Trace("Engine API request received", "method", "ForkChoiceUpdated", "head", heads.HeadBlockHash, "finalized", heads.FinalizedBlockHash, "safe", heads.SafeBlockHash)
if heads.HeadBlockHash == (common.Hash{}) {
return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: nil}, nil
}
if err := api.checkTerminalTotalDifficulty(heads.HeadBlockHash); err != nil {
if api.light {
if header := api.les.BlockChain().GetHeaderByHash(heads.HeadBlockHash); header == nil {
// TODO (MariusVanDerWijden) trigger sync
return SYNCING, nil
}
return INVALID, err
} else {
if block := api.eth.BlockChain().GetBlockByHash(heads.HeadBlockHash); block == nil {
// TODO (MariusVanDerWijden) trigger sync
return SYNCING, nil
}
return INVALID, err
}
}
// If the finalized block is set, check if it is in our blockchain
if heads.FinalizedBlockHash != (common.Hash{}) {
if api.light {
if header := api.les.BlockChain().GetHeaderByHash(heads.FinalizedBlockHash); header == nil {
// TODO (MariusVanDerWijden) trigger sync
return SYNCING, nil
}
} else {
if block := api.eth.BlockChain().GetBlockByHash(heads.FinalizedBlockHash); block == nil {
// TODO (MariusVanDerWijden) trigger sync
return SYNCING, nil
}
}
}
// SetHead
if err := api.setHead(heads.HeadBlockHash); err != nil {
return INVALID, err
}
// Assemble block (if needed)
if PayloadAttributes != nil {
data, err := api.assembleBlock(heads.HeadBlockHash, PayloadAttributes)
// Assemble block (if needed). It only works for full node.
if !api.light && payloadAttributes != nil {
data, err := api.assembleBlock(heads.HeadBlockHash, payloadAttributes)
if err != nil {
return INVALID, err
}
id := computePayloadId(heads.HeadBlockHash, PayloadAttributes)
id := computePayloadId(heads.HeadBlockHash, payloadAttributes)
api.preparedBlocks.put(id, data)
log.Info("Created payload", "payloadID", id)
return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: &id}, nil
@ -247,13 +187,28 @@ func (api *ConsensusAPI) ExecutePayloadV1(params ExecutableDataV1) (ExecutePaylo
return api.invalid(), err
}
if api.light {
if !api.les.BlockChain().HasHeader(block.ParentHash(), block.NumberU64()-1) {
/*
TODO (MariusVanDerWijden) reenable once sync is merged
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), block.Header()); err != nil {
return SYNCING, err
}
*/
// TODO (MariusVanDerWijden) we should return nil here not empty hash
return ExecutePayloadResponse{Status: SYNCING.Status, LatestValidHash: common.Hash{}}, nil
}
parent := api.les.BlockChain().GetHeaderByHash(params.ParentHash)
if parent == nil {
return api.invalid(), fmt.Errorf("could not find parent %x", params.ParentHash)
td := api.les.BlockChain().GetTd(parent.Hash(), block.NumberU64()-1)
ttd := api.les.BlockChain().Config().TerminalTotalDifficulty
if td.Cmp(ttd) < 0 {
return api.invalid(), fmt.Errorf("can not execute payload on top of block with low td got: %v threshold %v", td, ttd)
}
if err = api.les.BlockChain().InsertHeader(block.Header()); err != nil {
return api.invalid(), err
}
if merger := api.merger(); !merger.TDDReached() {
merger.ReachTTD()
}
return ExecutePayloadResponse{Status: VALID.Status, LatestValidHash: block.Hash()}, nil
}
if !api.eth.BlockChain().HasBlock(block.ParentHash(), block.NumberU64()-1) {
@ -290,99 +245,11 @@ func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *PayloadAt
return nil, errors.New("not supported")
}
log.Info("Producing block", "parentHash", parentHash)
bc := api.eth.BlockChain()
parent := bc.GetBlockByHash(parentHash)
if parent == nil {
log.Warn("Cannot assemble block with parent hash to unknown block", "parentHash", parentHash)
return nil, fmt.Errorf("cannot assemble block with unknown parent %s", parentHash)
}
if params.Timestamp <= parent.Time() {
return nil, fmt.Errorf("invalid timestamp: child's %d <= parent's %d", params.Timestamp, parent.Time())
}
if now := uint64(time.Now().Unix()); params.Timestamp > now+1 {
diff := time.Duration(params.Timestamp-now) * time.Second
log.Warn("Producing block too far in the future", "diff", common.PrettyDuration(diff))
}
pending := api.eth.TxPool().Pending(true)
coinbase := params.SuggestedFeeRecipient
num := parent.Number()
header := &types.Header{
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
Coinbase: coinbase,
GasLimit: parent.GasLimit(), // Keep the gas limit constant in this prototype
Extra: []byte{}, // TODO (MariusVanDerWijden) properly set extra data
Time: params.Timestamp,
MixDigest: params.Random,
}
if config := api.eth.BlockChain().Config(); config.IsLondon(header.Number) {
header.BaseFee = misc.CalcBaseFee(config, parent.Header())
}
if err := api.engine.Prepare(bc, header); err != nil {
return nil, err
}
env, err := api.makeEnv(parent, header)
block, err := api.eth.Miner().GetSealingBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random)
if err != nil {
return nil, err
}
var (
signer = types.MakeSigner(bc.Config(), header.Number)
txHeap = types.NewTransactionsByPriceAndNonce(signer, pending, nil)
transactions []*types.Transaction
)
for {
if env.gasPool.Gas() < chainParams.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", chainParams.TxGas)
break
}
tx := txHeap.Peek()
if tx == nil {
break
}
// The sender is only for logging purposes, and it doesn't really matter if it's correct.
from, _ := types.Sender(signer, tx)
// Execute the transaction
env.state.Prepare(tx.Hash(), env.tcount)
err = env.commitTransaction(tx, coinbase)
switch err {
case core.ErrGasLimitReached:
// Pop the current out-of-gas transaction without shifting in the next from the account
log.Trace("Gas limit exceeded for current block", "sender", from)
txHeap.Pop()
case core.ErrNonceTooLow:
// New head notification data race between the transaction pool and miner, shift
log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
txHeap.Shift()
case core.ErrNonceTooHigh:
// Reorg notification data race between the transaction pool and miner, skip account =
log.Trace("Skipping account with high nonce", "sender", from, "nonce", tx.Nonce())
txHeap.Pop()
case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
env.tcount++
txHeap.Shift()
transactions = append(transactions, tx)
default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
txHeap.Shift()
}
}
// Create the block.
block, err := api.engine.FinalizeAndAssemble(bc, header, env.state, transactions, nil /* uncles */, env.receipts)
if err != nil {
return nil, err
}
return BlockToExecutableData(block, params.Random), nil
return BlockToExecutableData(block), nil
}
func encodeTransactions(txs []*types.Transaction) [][]byte {
@ -413,8 +280,6 @@ func ExecutableDataToBlock(params ExecutableDataV1) (*types.Block, error) {
if len(params.ExtraData) > 32 {
return nil, fmt.Errorf("invalid extradata length: %v", len(params.ExtraData))
}
number := big.NewInt(0)
number.SetUint64(params.Number)
header := &types.Header{
ParentHash: params.ParentHash,
UncleHash: types.EmptyUncleHash,
@ -424,7 +289,7 @@ func ExecutableDataToBlock(params ExecutableDataV1) (*types.Block, error) {
ReceiptHash: params.ReceiptsRoot,
Bloom: types.BytesToBloom(params.LogsBloom),
Difficulty: common.Big0,
Number: number,
Number: new(big.Int).SetUint64(params.Number),
GasLimit: params.GasLimit,
GasUsed: params.GasUsed,
Time: params.Timestamp,
@ -439,7 +304,9 @@ func ExecutableDataToBlock(params ExecutableDataV1) (*types.Block, error) {
return block, nil
}
func BlockToExecutableData(block *types.Block, random common.Hash) *ExecutableDataV1 {
// BlockToExecutableData constructs the executableDataV1 structure by filling the
// fields from the given block. It assumes the given block is post-merge block.
func BlockToExecutableData(block *types.Block) *ExecutableDataV1 {
return &ExecutableDataV1{
BlockHash: block.Hash(),
ParentHash: block.ParentHash(),
@ -453,7 +320,7 @@ func BlockToExecutableData(block *types.Block, random common.Hash) *ExecutableDa
ReceiptsRoot: block.ReceiptHash(),
LogsBloom: block.Bloom().Bytes(),
Transactions: encodeTransactions(block.Transactions()),
Random: random,
Random: block.MixDigest(),
ExtraData: block.Extra(),
}
}
@ -471,6 +338,18 @@ func (api *ConsensusAPI) checkTerminalTotalDifficulty(head common.Hash) error {
if api.merger().PoSFinalized() {
return nil
}
if api.light {
// make sure the parent has enough terminal total difficulty
header := api.les.BlockChain().GetHeaderByHash(head)
if header == nil {
return &GenericServerError
}
td := api.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
if td != nil && td.Cmp(api.les.BlockChain().Config().TerminalTotalDifficulty) < 0 {
return &InvalidTB
}
return nil
}
// make sure the parent has enough terminal total difficulty
newHeadBlock := api.eth.BlockChain().GetBlockByHash(head)
if newHeadBlock == nil {
@ -499,8 +378,7 @@ func (api *ConsensusAPI) setHead(newHead common.Hash) error {
return err
}
// Trigger the transition if it's the first `NewHead` event.
merger := api.merger()
if !merger.PoSFinalized() {
if merger := api.merger(); !merger.PoSFinalized() {
merger.FinalizePoS()
}
return nil

View File

@ -26,7 +26,7 @@ import (
//go:generate go run github.com/fjl/gencodec -type PayloadAttributesV1 -field-override payloadAttributesMarshaling -out gen_blockparams.go
// Structure described at https://github.com/ethereum/execution-apis/pull/74
// PayloadAttributesV1 structure described at https://github.com/ethereum/execution-apis/pull/74
type PayloadAttributesV1 struct {
Timestamp uint64 `json:"timestamp" gencodec:"required"`
Random common.Hash `json:"random" gencodec:"required"`
@ -40,7 +40,7 @@ type payloadAttributesMarshaling struct {
//go:generate go run github.com/fjl/gencodec -type ExecutableDataV1 -field-override executableDataMarshaling -out gen_ed.go
// Structure described at https://github.com/ethereum/execution-apis/src/engine/specification.md
// ExecutableDataV1 structure described at https://github.com/ethereum/execution-apis/src/engine/specification.md
type ExecutableDataV1 struct {
ParentHash common.Hash `json:"parentHash" gencodec:"required"`
FeeRecipient common.Address `json:"feeRecipient" gencodec:"required"`

View File

@ -35,10 +35,12 @@ import (
"github.com/ethereum/go-ethereum/params"
)
// Backend wraps all methods required for mining.
// Backend wraps all methods required for mining. Only full node is capable
// to offer all the functions here.
type Backend interface {
BlockChain() *core.BlockChain
TxPool() *core.TxPool
StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error)
}
// Config is the configuration parameters of mining.
@ -68,7 +70,7 @@ type Miner struct {
wg sync.WaitGroup
}
func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(header *types.Header) bool, merger *consensus.Merger) *Miner {
func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(header *types.Header) bool) *Miner {
miner := &Miner{
eth: eth,
mux: mux,
@ -76,7 +78,7 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *even
exitCh: make(chan struct{}),
startCh: make(chan common.Address),
stopCh: make(chan struct{}),
worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true, merger),
worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true),
}
miner.wg.Add(1)
go miner.update()
@ -233,6 +235,12 @@ func (miner *Miner) DisablePreseal() {
miner.worker.disablePreseal()
}
// GetSealingBlock retrieves a sealing block based on the given parameters.
// The returned block is not sealed but all other fields should be filled.
func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) {
return miner.worker.getSealingBlock(parent, timestamp, coinbase, random)
}
// SubscribePendingLogs starts delivering logs from pending transactions
// to the given channel.
func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {

View File

@ -18,11 +18,11 @@
package miner
import (
"errors"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/clique"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
@ -55,6 +55,10 @@ func (m *mockBackend) TxPool() *core.TxPool {
return m.txPool
}
func (m *mockBackend) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) {
return nil, errors.New("not supported")
}
type testBlockChain struct {
statedb *state.StateDB
gasLimit uint64
@ -253,7 +257,6 @@ func createMiner(t *testing.T) (*Miner, *event.TypeMux, func(skipMiner bool)) {
// Create consensus engine
engine := clique.New(chainConfig.Clique, chainDB)
// Create Ethereum backend
merger := consensus.NewMerger(rawdb.NewMemoryDatabase())
bc, err := core.NewBlockChain(chainDB, nil, chainConfig, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("can't create new chain %v", err)
@ -266,7 +269,7 @@ func createMiner(t *testing.T) (*Miner, *event.TypeMux, func(skipMiner bool)) {
// Create event Mux
mux := new(event.TypeMux)
// Create Miner
miner := New(backend, &config, chainConfig, mux, engine, nil, merger)
miner := New(backend, &config, chainConfig, mux, engine, nil)
cleanup := func(skipMiner bool) {
bc.Stop()
engine.Close()

View File

@ -141,8 +141,14 @@ func (n *ethNode) assembleBlock(parentHash common.Hash, parentTimestamp uint64)
if n.typ != eth2MiningNode {
return nil, errors.New("invalid node type")
}
timestamp := uint64(time.Now().Unix())
if timestamp <= parentTimestamp {
timestamp = parentTimestamp + 1
}
payloadAttribute := catalyst.PayloadAttributesV1{
Timestamp: uint64(time.Now().Unix()),
Timestamp: timestamp,
Random: common.Hash{},
SuggestedFeeRecipient: common.HexToAddress("0xdeadbeef"),
}
fcState := catalyst.ForkchoiceStateV1{
HeadBlockHash: parentHash,
@ -287,9 +293,12 @@ func (mgr *nodeManager) run() {
fcState := catalyst.ForkchoiceStateV1{
HeadBlockHash: oldest.Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
FinalizedBlockHash: oldest.Hash(),
}
node.api.ForkchoiceUpdatedV1(fcState, nil)
// TODO(rjl493456442) finalization doesn't work properly, FIX IT
_ = fcState
_ = node
//node.api.ForkchoiceUpdatedV1(fcState, nil)
}
log.Info("Finalised eth2 block", "number", oldest.NumberU64(), "hash", oldest.Hash())
waitFinalise = waitFinalise[1:]
@ -331,13 +340,16 @@ func (mgr *nodeManager) run() {
nodes := mgr.getNodes(eth2MiningNode)
nodes = append(nodes, mgr.getNodes(eth2NormalNode)...)
nodes = append(nodes, mgr.getNodes(eth2LightClient)...)
for _, node := range nodes {
if err := node.insertBlockAndSetHead(parentBlock.Header(), *ed); err != nil {
log.Error("Failed to insert block", "type", node.typ, "err", err)
}
}
for _, node := range mgr.getNodes(eth2LightClient) {
if err := node.insertBlock(*ed); err != nil {
log.Error("Failed to insert block", "type", node.typ, "err", err)
}
}
log.Info("Create and insert eth2 block", "number", ed.Number)
parentBlock = block
waitFinalise = append(waitFinalise, block)
@ -410,9 +422,8 @@ func makeGenesis(faucets []*ecdsa.PrivateKey) *core.Genesis {
genesis.Difficulty = params.MinimumDifficulty
genesis.GasLimit = 25000000
genesis.Config.ChainID = big.NewInt(18)
genesis.Config.EIP150Hash = common.Hash{}
genesis.BaseFee = big.NewInt(params.InitialBaseFee)
genesis.Config = params.AllEthashProtocolChanges
genesis.Config.TerminalTotalDifficulty = transitionDifficulty
genesis.Alloc = core.GenesisAlloc{}

View File

@ -17,8 +17,8 @@
package miner
import (
"bytes"
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
@ -54,14 +54,14 @@ const (
// resubmitAdjustChanSize is the size of resubmitting interval adjustment channel.
resubmitAdjustChanSize = 10
// miningLogAtDepth is the number of confirmations before logging successful mining.
miningLogAtDepth = 7
// sealingLogAtDepth is the number of confirmations before logging successful sealing.
sealingLogAtDepth = 7
// minRecommitInterval is the minimal time interval to recreate the mining block with
// minRecommitInterval is the minimal time interval to recreate the sealing block with
// any newly arrived transactions.
minRecommitInterval = 1 * time.Second
// maxRecommitInterval is the maximum time interval to recreate the mining block with
// maxRecommitInterval is the maximum time interval to recreate the sealing block with
// any newly arrived transactions.
maxRecommitInterval = 15 * time.Second
@ -77,20 +77,68 @@ const (
staleThreshold = 7
)
// environment is the worker's current environment and holds all of the current state information.
// environment is the worker's current environment and holds all
// information of the sealing block generation.
type environment struct {
signer types.Signer
state *state.StateDB // apply state changes here
ancestors mapset.Set // ancestor set (used for checking uncle parent validity)
family mapset.Set // family set (used for checking uncle invalidity)
uncles mapset.Set // uncle set
tcount int // tx count in cycle
gasPool *core.GasPool // available gas used to pack transactions
coinbase common.Address
header *types.Header
txs []*types.Transaction
receipts []*types.Receipt
uncles map[common.Hash]*types.Header
}
// copy creates a deep copy of environment.
func (env *environment) copy() *environment {
cpy := &environment{
signer: env.signer,
state: env.state.Copy(),
ancestors: env.ancestors.Clone(),
family: env.family.Clone(),
tcount: env.tcount,
coinbase: env.coinbase,
header: types.CopyHeader(env.header),
receipts: copyReceipts(env.receipts),
}
if env.gasPool != nil {
gasPool := *env.gasPool
cpy.gasPool = &gasPool
}
// The content of txs and uncles are immutable, unnecessary
// to do the expensive deep copy for them.
cpy.txs = make([]*types.Transaction, len(env.txs))
copy(cpy.txs, env.txs)
cpy.uncles = make(map[common.Hash]*types.Header)
for hash, uncle := range env.uncles {
cpy.uncles[hash] = uncle
}
return cpy
}
// unclelist returns the contained uncles as the list format.
func (env *environment) unclelist() []*types.Header {
var uncles []*types.Header
for _, uncle := range env.uncles {
uncles = append(uncles, uncle)
}
return uncles
}
// discard terminates the background prefetcher go-routine. It should
// always be called for all created environment instances otherwise
// the go-routine leak can happen.
func (env *environment) discard() {
if env.state == nil {
return
}
env.state.StopPrefetcher()
}
// task contains all information for consensus engine sealing and result submitting.
@ -114,6 +162,13 @@ type newWorkReq struct {
timestamp int64
}
// getWorkReq represents a request for getting a new sealing work with provided parameters.
type getWorkReq struct {
params *generateParams
err error
result chan *types.Block
}
// intervalAdjust represents a resubmitting interval adjustment.
type intervalAdjust struct {
ratio float64
@ -128,7 +183,6 @@ type worker struct {
engine consensus.Engine
eth Backend
chain *core.BlockChain
merger *consensus.Merger
// Feeds
pendingLogsFeed event.Feed
@ -144,6 +198,7 @@ type worker struct {
// Channels
newWorkCh chan *newWorkReq
getWorkCh chan *getWorkReq
taskCh chan *task
resultCh chan *types.Block
startCh chan struct{}
@ -191,7 +246,7 @@ type worker struct {
resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
}
func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool, merger *consensus.Merger) *worker {
func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker {
worker := &worker{
config: config,
chainConfig: chainConfig,
@ -199,16 +254,16 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
eth: eth,
mux: mux,
chain: eth.BlockChain(),
merger: merger,
isLocalBlock: isLocalBlock,
localUncles: make(map[common.Hash]*types.Block),
remoteUncles: make(map[common.Hash]*types.Block),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth),
pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
getWorkCh: make(chan *getWorkReq),
taskCh: make(chan *task),
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: make(chan struct{}),
@ -264,15 +319,18 @@ func (w *worker) setExtra(extra []byte) {
// setRecommitInterval updates the interval for miner sealing work recommitting.
func (w *worker) setRecommitInterval(interval time.Duration) {
w.resubmitIntervalCh <- interval
select {
case w.resubmitIntervalCh <- interval:
case <-w.exitCh:
}
}
// disablePreseal disables pre-sealing mining feature
// disablePreseal disables pre-sealing feature
func (w *worker) disablePreseal() {
atomic.StoreUint32(&w.noempty, 1)
}
// enablePreseal enables pre-sealing mining feature
// enablePreseal enables pre-sealing feature
func (w *worker) enablePreseal() {
atomic.StoreUint32(&w.noempty, 0)
}
@ -350,13 +408,13 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t
return time.Duration(int64(next))
}
// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
// newWorkLoop is a standalone goroutine to submit new sealing work upon received events.
func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done()
var (
interrupt *int32
minRecommit = recommit // minimal resubmit interval specified by user.
timestamp int64 // timestamp for each round of mining.
timestamp int64 // timestamp for each round of sealing.
)
timer := time.NewTimer(0)
@ -401,7 +459,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
commit(false, commitInterruptNewHead)
case <-timer.C:
// If mining is running resubmit a new work cycle periodically to pull in
// If sealing is running resubmit a new work cycle periodically to pull in
// higher priced transactions. Disable this overhead for pending blocks.
if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) {
// Short circuit if no new transaction arrives.
@ -448,22 +506,36 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
}
}
// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
// mainLoop is responsible for generating and submitting sealing work based on
// the received event. It can support two modes: automatically generate task and
// submit it or return task according to given parameters for various proposes.
func (w *worker) mainLoop() {
defer w.wg.Done()
defer w.txsSub.Unsubscribe()
defer w.chainHeadSub.Unsubscribe()
defer w.chainSideSub.Unsubscribe()
defer func() {
if w.current != nil && w.current.state != nil {
w.current.state.StopPrefetcher()
if w.current != nil {
w.current.discard()
}
}()
cleanTicker := time.NewTicker(time.Second * 10)
defer cleanTicker.Stop()
for {
select {
case req := <-w.newWorkCh:
w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
w.commitWork(req.interrupt, req.noempty, req.timestamp)
case req := <-w.getWorkCh:
block, err := w.generateWork(req.params)
if err != nil {
req.err = err
req.result <- nil
} else {
req.result <- block
}
case ev := <-w.chainSideCh:
// Short circuit for duplicate side blocks
@ -479,46 +551,40 @@ func (w *worker) mainLoop() {
} else {
w.remoteUncles[ev.Block.Hash()] = ev.Block
}
// If our mining block contains less than 2 uncle blocks,
// add the new uncle block if valid and regenerate a mining block.
if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
// If our sealing block contains less than 2 uncle blocks,
// add the new uncle block if valid and regenerate a new
// sealing block for higher profit.
if w.isRunning() && w.current != nil && len(w.current.uncles) < 2 {
start := time.Now()
if err := w.commitUncle(w.current, ev.Block.Header()); err == nil {
var uncles []*types.Header
w.current.uncles.Each(func(item interface{}) bool {
hash, ok := item.(common.Hash)
if !ok {
return false
w.commit(w.current.copy(), nil, true, start)
}
uncle, exist := w.localUncles[hash]
if !exist {
uncle, exist = w.remoteUncles[hash]
}
if !exist {
return false
case <-cleanTicker.C:
chainHead := w.chain.CurrentBlock()
for hash, uncle := range w.localUncles {
if uncle.NumberU64()+staleThreshold <= chainHead.NumberU64() {
delete(w.localUncles, hash)
}
uncles = append(uncles, uncle.Header())
return false
})
w.commit(uncles, nil, true, start)
}
for hash, uncle := range w.remoteUncles {
if uncle.NumberU64()+staleThreshold <= chainHead.NumberU64() {
delete(w.remoteUncles, hash)
}
}
case ev := <-w.txsCh:
// Apply transactions to the pending state if we're not mining.
// Apply transactions to the pending state if we're not sealing
//
// Note all transactions received may not be continuous with transactions
// already included in the current mining block. These transactions will
// already included in the current sealing block. These transactions will
// be automatically eliminated.
if !w.isRunning() && w.current != nil {
// If block is already full, abort
if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
continue
}
w.mu.RLock()
coinbase := w.coinbase
w.mu.RUnlock()
txs := make(map[common.Address]types.Transactions)
for _, tx := range ev.Txs {
acc, _ := types.Sender(w.current.signer, tx)
@ -526,18 +592,19 @@ func (w *worker) mainLoop() {
}
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
tcount := w.current.tcount
w.commitTransactions(txset, coinbase, nil)
// Only update the snapshot if any new transactons were added
w.commitTransactions(w.current, txset, nil)
// Only update the snapshot if any new transactions were added
// to the pending block
if tcount != w.current.tcount {
w.updateSnapshot()
w.updateSnapshot(w.current)
}
} else {
// Special case, if the consensus engine is 0 period clique(dev mode),
// submit mining work here since all empty submission will be rejected
// submit sealing work here since all empty submission will be rejected
// by clique. Of course the advance sealing(empty submission) is disabled.
if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 {
w.commitNewWork(nil, true, time.Now().Unix())
w.commitWork(nil, true, time.Now().Unix())
}
}
atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))
@ -679,23 +746,35 @@ func (w *worker) resultLoop() {
}
}
// makeCurrent creates a new environment for the current cycle.
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
// makeEnv creates a new environment for the sealing block.
func (w *worker) makeEnv(parent *types.Block, header *types.Header, coinbase common.Address) (*environment, error) {
// Retrieve the parent state to execute on top and start a prefetcher for
// the miner to speed block sealing up a bit
// the miner to speed block sealing up a bit.
state, err := w.chain.StateAt(parent.Root())
if err != nil {
return err
// Note since the sealing block can be created upon the arbitrary parent
// block, but the state of parent block may already be pruned, so the necessary
// state recovery is needed here in the future.
//
// The maximum acceptable reorg depth can be limited by the finalised block
// somehow. TODO(rjl493456442) fix the hard-coded number here later.
state, err = w.eth.StateAtBlock(parent, 1024, nil, false, false)
log.Warn("Recovered mining state", "root", parent.Root(), "err", err)
}
if err != nil {
return nil, err
}
state.StartPrefetcher("miner")
// Note the passed coinbase may be different with header.Coinbase.
env := &environment{
signer: types.MakeSigner(w.chainConfig, header.Number),
state: state,
coinbase: coinbase,
ancestors: mapset.NewSet(),
family: mapset.NewSet(),
uncles: mapset.NewSet(),
header: header,
uncles: make(map[common.Hash]*types.Header),
}
// when 08 is processed ancestors contain 07 (quick block)
for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) {
@ -707,20 +786,16 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
}
// Keep track of transactions which return errors so they can be removed
env.tcount = 0
// Swap out the old work with the new one, terminating any leftover prefetcher
// processes in the mean time and starting a new one.
if w.current != nil && w.current.state != nil {
w.current.state.StopPrefetcher()
}
w.current = env
return nil
return env, nil
}
// commitUncle adds the given block to uncle block set, returns error if failed to add.
func (w *worker) commitUncle(env *environment, uncle *types.Header) error {
if w.isTTDReached(env.header) {
return errors.New("ignore uncle for beacon block")
}
hash := uncle.Hash()
if env.uncles.Contains(hash) {
if _, exist := env.uncles[hash]; exist {
return errors.New("uncle not unique")
}
if env.header.ParentHash == uncle.ParentHash {
@ -732,82 +807,58 @@ func (w *worker) commitUncle(env *environment, uncle *types.Header) error {
if env.family.Contains(hash) {
return errors.New("uncle already included")
}
env.uncles.Add(uncle.Hash())
env.uncles[hash] = uncle
return nil
}
// updateSnapshot updates pending snapshot block and state.
// Note this function assumes the current variable is thread safe.
func (w *worker) updateSnapshot() {
// updateSnapshot updates pending snapshot block, receipts and state.
func (w *worker) updateSnapshot(env *environment) {
w.snapshotMu.Lock()
defer w.snapshotMu.Unlock()
var uncles []*types.Header
w.current.uncles.Each(func(item interface{}) bool {
hash, ok := item.(common.Hash)
if !ok {
return false
}
uncle, exist := w.localUncles[hash]
if !exist {
uncle, exist = w.remoteUncles[hash]
}
if !exist {
return false
}
uncles = append(uncles, uncle.Header())
return false
})
w.snapshotBlock = types.NewBlock(
w.current.header,
w.current.txs,
uncles,
w.current.receipts,
env.header,
env.txs,
env.unclelist(),
env.receipts,
trie.NewStackTrie(nil),
)
w.snapshotReceipts = copyReceipts(w.current.receipts)
w.snapshotState = w.current.state.Copy()
w.snapshotReceipts = copyReceipts(env.receipts)
w.snapshotState = env.state.Copy()
}
func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
snap := w.current.state.Snapshot()
func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*types.Log, error) {
snap := env.state.Snapshot()
receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &env.coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *w.chain.GetVMConfig())
if err != nil {
w.current.state.RevertToSnapshot(snap)
env.state.RevertToSnapshot(snap)
return nil, err
}
w.current.txs = append(w.current.txs, tx)
w.current.receipts = append(w.current.receipts, receipt)
env.txs = append(env.txs, tx)
env.receipts = append(env.receipts, receipt)
return receipt.Logs, nil
}
func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool {
// Short circuit if current is nil
if w.current == nil {
return true
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
}
gasLimit := w.current.header.GasLimit
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(gasLimit)
}
var coalescedLogs []*types.Log
for {
// In the following three cases, we will interrupt the execution of the transaction.
// (1) new head block event arrival, the interrupt signal is 1
// (2) worker start or restart, the interrupt signal is 1
// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
// (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2.
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
ratio := float64(gasLimit-w.current.gasPool.Gas()) / float64(gasLimit)
ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit)
if ratio < 0.1 {
ratio = 0.1
}
@ -819,8 +870,8 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
}
// If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas)
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
break
}
// Retrieve the next transaction and abort if all done
@ -832,19 +883,19 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
// during transaction acceptance is the transaction pool.
//
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(w.current.signer, tx)
from, _ := types.Sender(env.signer, tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) {
if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) {
log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block)
txs.Pop()
continue
}
// Start executing the transaction
w.current.state.Prepare(tx.Hash(), w.current.tcount)
env.state.Prepare(tx.Hash(), env.tcount)
logs, err := w.commitTransaction(tx, coinbase)
logs, err := w.commitTransaction(env, tx)
switch {
case errors.Is(err, core.ErrGasLimitReached):
// Pop the current out-of-gas transaction without shifting in the next from the account
@ -864,7 +915,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
case errors.Is(err, nil):
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
w.current.tcount++
env.tcount++
txs.Shift()
case errors.Is(err, core.ErrTxTypeNotSupported):
@ -881,8 +932,8 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
}
if !w.isRunning() && len(coalescedLogs) > 0 {
// We don't push the pendingLogsEvent while we are mining. The reason is that
// when we are mining, the worker will regenerate a mining block every 3 seconds.
// We don't push the pendingLogsEvent while we are sealing. The reason is that
// when we are sealing, the worker will regenerate a sealing block every 3 seconds.
// In order to avoid pushing the repeated pendingLog, we disable the pending log pushing.
// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
@ -903,24 +954,56 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
return false
}
// commitNewWork generates several new sealing tasks based on the parent block.
func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
// generateParams wraps various of settings for generating sealing task.
type generateParams struct {
timestamp uint64 // The timstamp for sealing task
forceTime bool // Flag whether the given timestamp is immutable or not
parentHash common.Hash // Parent block hash, empty means the latest chain head
coinbase common.Address // The fee recipient address for including transaction
random common.Hash // The randomness generated by beacon chain, empty before the merge
noUncle bool // Flag whether the uncle block inclusion is allowed
noExtra bool // Flag whether the extra field assignment is allowed
}
// prepareWork constructs the sealing task according to the given parameters,
// either based on the last chain head or specified parent. In this function
// the pending transactions are not filled yet, only the empty task returned.
func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
w.mu.RLock()
defer w.mu.RUnlock()
tstart := time.Now()
// Find the parent block for sealing task
parent := w.chain.CurrentBlock()
if parent.Time() >= uint64(timestamp) {
timestamp = int64(parent.Time() + 1)
if genParams.parentHash != (common.Hash{}) {
parent = w.chain.GetBlockByHash(genParams.parentHash)
}
if parent == nil {
return nil, fmt.Errorf("missing parent")
}
// Sanity check the timestamp correctness, recap the timestamp
// to parent+1 if the mutation is allowed.
timestamp := genParams.timestamp
if parent.Time() >= timestamp {
if genParams.forceTime {
return nil, fmt.Errorf("invalid timestamp, parent %d given %d", parent.Time(), timestamp)
}
timestamp = parent.Time() + 1
}
// Construct the sealing block header, set the extra field if it's allowed
num := parent.Number()
header := &types.Header{
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
GasLimit: core.CalcGasLimit(parent.GasLimit(), w.config.GasCeil),
Extra: w.extra,
Time: uint64(timestamp),
Time: timestamp,
Coinbase: genParams.coinbase,
}
if !genParams.noExtra && len(w.extra) != 0 {
header.Extra = w.extra
}
// Set the randomness field from the beacon chain if it's available.
if genParams.random != (common.Hash{}) {
header.MixDigest = genParams.random
}
// Set baseFee and GasLimit if we are on an EIP-1559 chain
if w.chainConfig.IsLondon(header.Number) {
@ -930,83 +1013,47 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
header.GasLimit = core.CalcGasLimit(parentGasLimit, w.config.GasCeil)
}
}
// Only set the coinbase if our consensus engine is running (avoid spurious block rewards)
if w.isRunning() {
if w.coinbase == (common.Address{}) {
log.Error("Refusing to mine without etherbase")
return
}
header.Coinbase = w.coinbase
}
// Run the consensus preparation with the default or customized consensus engine.
if err := w.engine.Prepare(w.chain, header); err != nil {
log.Error("Failed to prepare header for mining", "err", err)
return
}
// If we are care about TheDAO hard-fork check whether to override the extra-data or not
if daoBlock := w.chainConfig.DAOForkBlock; daoBlock != nil {
// Check whether the block is among the fork extra-override range
limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
// Depending whether we support or oppose the fork, override differently
if w.chainConfig.DAOForkSupport {
header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
} else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data
}
}
log.Error("Failed to prepare header for sealing", "err", err)
return nil, err
}
// Could potentially happen if starting to mine in an odd state.
err := w.makeCurrent(parent, header)
// Note genParams.coinbase can be different with header.Coinbase
// since clique algorithm can modify the coinbase field in header.
env, err := w.makeEnv(parent, header, genParams.coinbase)
if err != nil {
log.Error("Failed to create mining context", "err", err)
return
log.Error("Failed to create sealing context", "err", err)
return nil, err
}
// Create the current work task and check any fork transitions needed
env := w.current
if w.chainConfig.DAOForkSupport && w.chainConfig.DAOForkBlock != nil && w.chainConfig.DAOForkBlock.Cmp(header.Number) == 0 {
misc.ApplyDAOHardFork(env.state)
}
// Accumulate the uncles for the current block
uncles := make([]*types.Header, 0, 2)
// Accumulate the uncles for the sealing work only if it's allowed.
if !genParams.noUncle {
commitUncles := func(blocks map[common.Hash]*types.Block) {
// Clean up stale uncle blocks first
for hash, uncle := range blocks {
if uncle.NumberU64()+staleThreshold <= header.Number.Uint64() {
delete(blocks, hash)
}
}
for hash, uncle := range blocks {
if len(uncles) == 2 {
if len(env.uncles) == 2 {
break
}
if err := w.commitUncle(env, uncle.Header()); err != nil {
log.Trace("Possible uncle rejected", "hash", hash, "reason", err)
} else {
log.Debug("Committing new uncle to block", "hash", hash)
uncles = append(uncles, uncle.Header())
}
}
}
// Prefer to locally generated uncle
commitUncles(w.localUncles)
commitUncles(w.remoteUncles)
// Create an empty block based on temporary copied state for
// sealing in advance without waiting block execution finished.
if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
w.commit(uncles, nil, false, tstart)
}
return env, nil
}
// fillTransactions retrieves the pending transactions from the txpool and fills them
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(interrupt *int32, env *environment) {
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(true)
// Short circuit if there is no available pending transactions.
// But if we disable empty precommit already, ignore it. Since
// empty block is necessary to keep the liveness of the network.
if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 {
w.updateSnapshot()
return
}
// Split the pending transactions into locals and remotes
localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
for _, account := range w.eth.TxPool().Locals() {
if txs := remoteTxs[account]; len(txs) > 0 {
@ -1015,57 +1062,136 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
}
}
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs, header.BaseFee)
if w.commitTransactions(txs, w.coinbase, interrupt) {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interrupt) {
return
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs, header.BaseFee)
if w.commitTransactions(txs, w.coinbase, interrupt) {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interrupt) {
return
}
}
w.commit(uncles, w.fullTaskHook, true, tstart)
}
// generateWork generates a sealing block based on the given parameters.
func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
work, err := w.prepareWork(params)
if err != nil {
return nil, err
}
defer work.discard()
w.fillTransactions(nil, work)
return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}
// commitWork generates several new sealing tasks based on the parent block
// and submit them to the sealer.
func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
start := time.Now()
// Set the coinbase if the worker is running or it's required
var coinbase common.Address
if w.isRunning() {
if w.coinbase == (common.Address{}) {
log.Error("Refusing to mine without etherbase")
return
}
coinbase = w.coinbase // Use the preset address as the fee recipient
}
work, err := w.prepareWork(&generateParams{
timestamp: uint64(timestamp),
coinbase: coinbase,
})
if err != nil {
return
}
// Create an empty block based on temporary copied state for
// sealing in advance without waiting block execution finished.
if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
w.commit(work.copy(), nil, false, start)
}
// Fill pending transactions from the txpool
w.fillTransactions(interrupt, work)
w.commit(work.copy(), w.fullTaskHook, true, start)
// Swap out the old work with the new one, terminating any leftover
// prefetcher processes in the mean time and starting a new one.
if w.current != nil {
w.current.discard()
}
w.current = work
}
// commit runs any post-transaction state modifications, assembles the final block
// and commits new work if consensus engine is running.
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
// Deep copy receipts here to avoid interaction between different tasks.
receipts := copyReceipts(w.current.receipts)
s := w.current.state.Copy()
block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, receipts)
if err != nil {
return err
}
// Note the assumption is held that the mutation is allowed to the passed env, do
// the deep copy first.
func (w *worker) commit(env *environment, interval func(), update bool, start time.Time) error {
if w.isRunning() {
if interval != nil {
interval()
}
// If we're post merge, just ignore
td, ttd := w.chain.GetTd(block.ParentHash(), block.NumberU64()-1), w.chain.Config().TerminalTotalDifficulty
if td != nil && ttd != nil && td.Cmp(ttd) >= 0 {
return nil
block, err := w.engine.FinalizeAndAssemble(w.chain, env.header, env.state, env.txs, env.unclelist(), env.receipts)
if err != nil {
return err
}
// If we're post merge, just ignore
if !w.isTTDReached(block.Header()) {
select {
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now()}:
w.unconfirmed.Shift(block.NumberU64() - 1)
log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
"uncles", len(uncles), "txs", w.current.tcount,
"gas", block.GasUsed(), "fees", totalFees(block, receipts),
log.Info("Commit new sealing work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
"uncles", len(env.uncles), "txs", env.tcount,
"gas", block.GasUsed(), "fees", totalFees(block, env.receipts),
"elapsed", common.PrettyDuration(time.Since(start)))
case <-w.exitCh:
log.Info("Worker has exited")
}
}
}
if update {
w.updateSnapshot()
w.updateSnapshot(env)
}
return nil
}
// getSealingBlock generates the sealing block based on the given parameters.
func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) {
req := &getWorkReq{
params: &generateParams{
timestamp: timestamp,
forceTime: true,
parentHash: parent,
coinbase: coinbase,
random: random,
noUncle: true,
noExtra: true,
},
result: make(chan *types.Block, 1),
}
select {
case w.getWorkCh <- req:
block := <-req.result
if block == nil {
return nil, req.err
}
return block, nil
case <-w.exitCh:
return nil, errors.New("miner closed")
}
}
// isTTDReached returns the indicator if the given block has reached the total
// terminal difficulty for The Merge transition.
func (w *worker) isTTDReached(header *types.Header) bool {
td, ttd := w.chain.GetTd(header.ParentHash, header.Number.Uint64()-1), w.chain.Config().TerminalTotalDifficulty
return td != nil && ttd != nil && td.Cmp(ttd) >= 0
}
// copyReceipts makes a deep copy of the given receipts.
func copyReceipts(receipts []*types.Receipt) []*types.Receipt {
result := make([]*types.Receipt, len(receipts))

View File

@ -17,6 +17,7 @@
package miner
import (
"errors"
"math/big"
"math/rand"
"sync/atomic"
@ -30,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
@ -166,6 +168,9 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain }
func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool }
func (b *testWorkerBackend) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) {
return nil, errors.New("not supported")
}
func (b *testWorkerBackend) newRandomUncle() *types.Block {
var parent *types.Block
@ -197,7 +202,7 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction {
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend) {
backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks)
backend.txPool.AddLocals(pendingTxs)
w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false, consensus.NewMerger(rawdb.NewMemoryDatabase()))
w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false)
w.setEtherbase(testBankAddress)
return w, backend
}
@ -521,3 +526,144 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
t.Error("interval reset timeout")
}
}
func TestGetSealingWorkEthash(t *testing.T) {
testGetSealingWork(t, ethashChainConfig, ethash.NewFaker(), false)
}
func TestGetSealingWorkClique(t *testing.T) {
testGetSealingWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()), false)
}
func TestGetSealingWorkPostMerge(t *testing.T) {
local := new(params.ChainConfig)
*local = *ethashChainConfig
local.TerminalTotalDifficulty = big.NewInt(0)
testGetSealingWork(t, local, ethash.NewFaker(), true)
}
func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, postMerge bool) {
defer engine.Close()
w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0)
defer w.close()
w.setExtra([]byte{0x01, 0x02})
w.postSideBlock(core.ChainSideEvent{Block: b.uncleBlock})
w.skipSealHook = func(task *task) bool {
return true
}
w.fullTaskHook = func() {
time.Sleep(100 * time.Millisecond)
}
timestamp := uint64(time.Now().Unix())
assertBlock := func(block *types.Block, number uint64, coinbase common.Address, random common.Hash) {
if block.Time() != timestamp {
// Sometime the timestamp will be mutated if the timestamp
// is even smaller than parent block's. It's OK.
t.Logf("Invalid timestamp, want %d, get %d", timestamp, block.Time())
}
if len(block.Uncles()) != 0 {
t.Error("Unexpected uncle block")
}
_, isClique := engine.(*clique.Clique)
if !isClique {
if len(block.Extra()) != 0 {
t.Error("Unexpected extra field")
}
if block.Coinbase() != coinbase {
t.Errorf("Unexpected coinbase got %x want %x", block.Coinbase(), coinbase)
}
} else {
if block.Coinbase() != (common.Address{}) {
t.Error("Unexpected coinbase")
}
}
if !isClique {
if block.MixDigest() != random {
t.Error("Unexpected mix digest")
}
}
if block.Nonce() != 0 {
t.Error("Unexpected block nonce")
}
if block.NumberU64() != number {
t.Errorf("Mismatched block number, want %d got %d", number, block.NumberU64())
}
}
var cases = []struct {
parent common.Hash
coinbase common.Address
random common.Hash
expectNumber uint64
expectErr bool
}{
{
b.chain.Genesis().Hash(),
common.HexToAddress("0xdeadbeef"),
common.HexToHash("0xcafebabe"),
uint64(1),
false,
},
{
b.chain.CurrentBlock().Hash(),
common.HexToAddress("0xdeadbeef"),
common.HexToHash("0xcafebabe"),
b.chain.CurrentBlock().NumberU64() + 1,
false,
},
{
b.chain.CurrentBlock().Hash(),
common.Address{},
common.HexToHash("0xcafebabe"),
b.chain.CurrentBlock().NumberU64() + 1,
false,
},
{
b.chain.CurrentBlock().Hash(),
common.Address{},
common.Hash{},
b.chain.CurrentBlock().NumberU64() + 1,
false,
},
{
common.HexToHash("0xdeadbeef"),
common.HexToAddress("0xdeadbeef"),
common.HexToHash("0xcafebabe"),
0,
true,
},
}
// This API should work even when the automatic sealing is not enabled
for _, c := range cases {
block, err := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random)
if c.expectErr {
if err == nil {
t.Error("Expect error but get nil")
}
} else {
if err != nil {
t.Errorf("Unexpected error %v", err)
}
assertBlock(block, c.expectNumber, c.coinbase, c.random)
}
}
// This API should work even when the automatic sealing is enabled
w.start()
for _, c := range cases {
block, err := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random)
if c.expectErr {
if err == nil {
t.Error("Expect error but get nil")
}
} else {
if err != nil {
t.Errorf("Unexpected error %v", err)
}
assertBlock(block, c.expectNumber, c.coinbase, c.random)
}
}
}