Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
9 changed files with 338 additions and 131 deletions
Showing only changes of commit a2a144c593 - Show all commits

View File

@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
@ -279,23 +280,21 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
} }
// If payload generation was requested, create a new block to be potentially // If payload generation was requested, create a new block to be potentially
// sealed by the beacon client. The payload will be requested later, and we // sealed by the beacon client. The payload will be requested later, and we
// might replace it arbitrarily many times in between. // will replace it arbitrarily many times in between.
if payloadAttributes != nil { if payloadAttributes != nil {
// Create an empty block first which can be used as a fallback args := &miner.BuildPayloadArgs{
empty, err := api.eth.Miner().GetSealingBlockSync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, true) Parent: update.HeadBlockHash,
if err != nil { Timestamp: payloadAttributes.Timestamp,
log.Error("Failed to create empty sealing payload", "err", err) FeeRecipient: payloadAttributes.SuggestedFeeRecipient,
return valid(nil), beacon.InvalidPayloadAttributes.With(err) Random: payloadAttributes.Random,
} }
// Send a request to generate a full block in the background. payload, err := api.eth.Miner().BuildPayload(args)
// The result can be obtained via the returned channel.
resCh, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false)
if err != nil { if err != nil {
log.Error("Failed to create async sealing payload", "err", err) log.Error("Failed to build payload", "err", err)
return valid(nil), beacon.InvalidPayloadAttributes.With(err) return valid(nil), beacon.InvalidPayloadAttributes.With(err)
} }
id := computePayloadId(update.HeadBlockHash, payloadAttributes) id := computePayloadId(update.HeadBlockHash, payloadAttributes)
api.localBlocks.put(id, &payload{empty: empty, result: resCh}) api.localBlocks.put(id, payload)
return valid(&id), nil return valid(&id), nil
} }
return valid(nil), nil return valid(nil), nil

View File

@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
@ -181,6 +182,8 @@ func TestEth2PrepareAndGetPayload(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("error preparing payload, err=%v", err) t.Fatalf("error preparing payload, err=%v", err)
} }
// give the payload some time to be built
time.Sleep(100 * time.Millisecond)
payloadID := computePayloadId(fcState.HeadBlockHash, &blockParams) payloadID := computePayloadId(fcState.HeadBlockHash, &blockParams)
execData, err := api.GetPayloadV1(payloadID) execData, err := api.GetPayloadV1(payloadID)
if err != nil { if err != nil {
@ -586,12 +589,12 @@ func TestNewPayloadOnInvalidChain(t *testing.T) {
if resp.PayloadStatus.Status != beacon.VALID { if resp.PayloadStatus.Status != beacon.VALID {
t.Fatalf("error preparing payload, invalid status: %v", resp.PayloadStatus.Status) t.Fatalf("error preparing payload, invalid status: %v", resp.PayloadStatus.Status)
} }
// give the payload some time to be built
time.Sleep(100 * time.Millisecond)
payload, err := api.GetPayloadV1(*resp.PayloadID) payload, err := api.GetPayloadV1(*resp.PayloadID)
if err != nil { if err != nil {
t.Fatalf("can't get payload: %v", err) t.Fatalf("can't get payload: %v", err)
} }
// TODO(493456442, marius) this test can be flaky since we rely on a 100ms
// allowance for block generation internally.
if len(payload.Transactions) == 0 { if len(payload.Transactions) == 0 {
t.Fatalf("payload should not be empty") t.Fatalf("payload should not be empty")
} }
@ -618,11 +621,17 @@ func TestNewPayloadOnInvalidChain(t *testing.T) {
} }
func assembleBlock(api *ConsensusAPI, parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) { func assembleBlock(api *ConsensusAPI, parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) {
block, err := api.eth.Miner().GetSealingBlockSync(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random, false) args := &miner.BuildPayloadArgs{
Parent: parentHash,
Timestamp: params.Timestamp,
FeeRecipient: params.SuggestedFeeRecipient,
Random: params.Random,
}
payload, err := api.eth.Miner().BuildPayload(args)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return beacon.BlockToExecutableData(block), nil return payload.ResolveFull(), nil
} }
func TestEmptyBlocks(t *testing.T) { func TestEmptyBlocks(t *testing.T) {
@ -854,16 +863,17 @@ func TestNewPayloadOnInvalidTerminalBlock(t *testing.T) {
} }
// Test parent already post TTD in NewPayload // Test parent already post TTD in NewPayload
params := beacon.PayloadAttributesV1{ args := &miner.BuildPayloadArgs{
Parent: parent.Hash(),
Timestamp: parent.Time() + 1, Timestamp: parent.Time() + 1,
Random: crypto.Keccak256Hash([]byte{byte(1)}), Random: crypto.Keccak256Hash([]byte{byte(1)}),
SuggestedFeeRecipient: parent.Coinbase(), FeeRecipient: parent.Coinbase(),
} }
empty, err := api.eth.Miner().GetSealingBlockSync(parent.Hash(), params.Timestamp, params.SuggestedFeeRecipient, params.Random, true) payload, err := api.eth.Miner().BuildPayload(args)
if err != nil { if err != nil {
t.Fatalf("error preparing payload, err=%v", err) t.Fatalf("error preparing payload, err=%v", err)
} }
data := *beacon.BlockToExecutableData(empty) data := *payload.Resolve()
resp2, err := api.NewPayloadV1(data) resp2, err := api.NewPayloadV1(data)
if err != nil { if err != nil {
t.Fatalf("error sending NewPayload, err=%v", err) t.Fatalf("error sending NewPayload, err=%v", err)

View File

@ -18,11 +18,11 @@ package catalyst
import ( import (
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/beacon" "github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/miner"
) )
// maxTrackedPayloads is the maximum number of prepared payloads the execution // maxTrackedPayloads is the maximum number of prepared payloads the execution
@ -35,52 +35,11 @@ const maxTrackedPayloads = 10
// latest one; but have a slight wiggle room for non-ideal conditions. // latest one; but have a slight wiggle room for non-ideal conditions.
const maxTrackedHeaders = 10 const maxTrackedHeaders = 10
// payload wraps the miner's block production channel, allowing the mined block
// to be retrieved later upon the GetPayload engine API call.
type payload struct {
lock sync.Mutex
done bool
empty *types.Block
block *types.Block
result chan *types.Block
}
// resolve extracts the generated full block from the given channel if possible
// or fallback to empty block as an alternative.
func (req *payload) resolve() *beacon.ExecutableDataV1 {
// this function can be called concurrently, prevent any
// concurrency issue in the first place.
req.lock.Lock()
defer req.lock.Unlock()
// Try to resolve the full block first if it's not obtained
// yet. The returned block can be nil if the generation fails.
if !req.done {
timeout := time.NewTimer(500 * time.Millisecond)
defer timeout.Stop()
select {
case req.block = <-req.result:
req.done = true
case <-timeout.C:
// TODO(rjl49345642, Marius), should we keep this
// 100ms timeout allowance? Why not just use the
// default and then fallback to empty directly?
}
}
if req.block != nil {
return beacon.BlockToExecutableData(req.block)
}
return beacon.BlockToExecutableData(req.empty)
}
// payloadQueueItem represents an id->payload tuple to store until it's retrieved // payloadQueueItem represents an id->payload tuple to store until it's retrieved
// or evicted. // or evicted.
type payloadQueueItem struct { type payloadQueueItem struct {
id beacon.PayloadID id beacon.PayloadID
data *payload payload *miner.Payload
} }
// payloadQueue tracks the latest handful of constructed payloads to be retrieved // payloadQueue tracks the latest handful of constructed payloads to be retrieved
@ -99,14 +58,14 @@ func newPayloadQueue() *payloadQueue {
} }
// put inserts a new payload into the queue at the given id. // put inserts a new payload into the queue at the given id.
func (q *payloadQueue) put(id beacon.PayloadID, data *payload) { func (q *payloadQueue) put(id beacon.PayloadID, payload *miner.Payload) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
copy(q.payloads[1:], q.payloads) copy(q.payloads[1:], q.payloads)
q.payloads[0] = &payloadQueueItem{ q.payloads[0] = &payloadQueueItem{
id: id, id: id,
data: data, payload: payload,
} }
} }
@ -120,7 +79,7 @@ func (q *payloadQueue) get(id beacon.PayloadID) *beacon.ExecutableDataV1 {
return nil // no more items return nil // no more items
} }
if item.id == id { if item.id == id {
return item.data.resolve() return item.payload.Resolve()
} }
} }
return nil return nil

View File

@ -251,26 +251,7 @@ func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscript
return miner.worker.pendingLogsFeed.Subscribe(ch) return miner.worker.pendingLogsFeed.Subscribe(ch)
} }
// GetSealingBlockAsync requests to generate a sealing block according to the // BuildPayload builds the payload according to the provided parameters.
// given parameters. Regardless of whether the generation is successful or not, func (miner *Miner) BuildPayload(args *BuildPayloadArgs) (*Payload, error) {
// there is always a result that will be returned through the result channel. return miner.worker.buildPayload(args)
// The difference is that if the execution fails, the returned result is nil
// and the concrete error is dropped silently.
func (miner *Miner) GetSealingBlockAsync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, error) {
resCh, _, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs)
if err != nil {
return nil, err
}
return resCh, nil
}
// GetSealingBlockSync creates a sealing block according to the given parameters.
// If the generation is failed or the underlying work is already closed, an error
// will be returned.
func (miner *Miner) GetSealingBlockSync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (*types.Block, error) {
resCh, errCh, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs)
if err != nil {
return nil, err
}
return <-resCh, <-errCh
} }

168
miner/payload_building.go Normal file
View File

@ -0,0 +1,168 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>
package miner
import (
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/types"
)
// BuildPayloadArgs contains the provided parameters for building payload.
// Check engine-api specification for more details.
// https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#payloadattributesv1
type BuildPayloadArgs struct {
Parent common.Hash // The parent block to build payload on top
Timestamp uint64 // The provided timestamp of generated payload
FeeRecipient common.Address // The provided recipient address for collecting transaction fee
Random common.Hash // The provided randomness value
}
// Payload wraps the built payload(block waiting for sealing). According to the
// engine-api specification, EL should build the initial version of the payload
// which has an empty transaction set and then keep update it in order to maximize
// the revenue. Therefore, the empty-block here is always available and full-block
// will be set/updated afterwards.
type Payload struct {
empty *types.Block
full *types.Block
fullFees *big.Int
stop chan struct{}
lock *sync.Mutex
cond *sync.Cond
}
// newPayload initializes the payload object.
func newPayload(empty *types.Block) *Payload {
lock := new(sync.Mutex)
return &Payload{
empty: empty,
stop: make(chan struct{}),
lock: lock,
cond: sync.NewCond(lock),
}
}
// update updates the full-block with latest built version.
func (payload *Payload) update(block *types.Block, fees *big.Int) {
payload.lock.Lock()
defer payload.lock.Unlock()
select {
case <-payload.stop:
return // reject stale update
default:
}
// Ensure the newly provided full block has a higher transaction fee.
// In post-merge stage, there is no uncle reward anymore and transaction
// fee(apart from the mev revenue) is the only indicator for comparison.
if payload.full == nil || fees.Cmp(payload.fullFees) > 0 {
payload.full = block
payload.fullFees = fees
}
payload.cond.Broadcast() // fire signal for notifying full block
}
// Resolve returns the latest built payload and also terminates the background
// thread for updating payload. It's safe to be called multiple times.
func (payload *Payload) Resolve() *beacon.ExecutableDataV1 {
payload.lock.Lock()
defer payload.lock.Unlock()
select {
case <-payload.stop:
default:
close(payload.stop)
}
if payload.full != nil {
return beacon.BlockToExecutableData(payload.full)
}
return beacon.BlockToExecutableData(payload.empty)
}
// ResolveEmpty is basically identical to Resolve, but it expects empty block only.
// It's only used in tests.
func (payload *Payload) ResolveEmpty() *beacon.ExecutableDataV1 {
payload.lock.Lock()
defer payload.lock.Unlock()
return beacon.BlockToExecutableData(payload.empty)
}
// ResolveFull is basically identical to Resolve, but it expects full block only.
// It's only used in tests.
func (payload *Payload) ResolveFull() *beacon.ExecutableDataV1 {
payload.lock.Lock()
defer payload.lock.Unlock()
if payload.full == nil {
select {
case <-payload.stop:
return nil
default:
}
payload.cond.Wait()
}
return beacon.BlockToExecutableData(payload.full)
}
// buildPayload builds the payload according to the provided parameters.
func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
// Build the initial version with no transaction included. It should be fast
// enough to run. The empty payload can at least make sure there is something
// to deliver for not missing slot.
empty, _, err := w.getSealingBlock(args.Parent, args.Timestamp, args.FeeRecipient, args.Random, true)
if err != nil {
return nil, err
}
// Construct a payload object for return.
payload := newPayload(empty)
// Spin up a routine for updating the payload in background. This strategy
// can maximum the revenue for including transactions with highest fee.
go func() {
// Setup the timer for re-building the payload. The initial clock is kept
// for triggering process immediately.
timer := time.NewTimer(0)
defer timer.Stop()
// Setup the timer for terminating the process if SECONDS_PER_SLOT (12s in
// the Mainnet configuration) have passed since the point in time identified
// by the timestamp parameter.
endTimer := time.NewTimer(time.Second * 12)
for {
select {
case <-timer.C:
block, fees, err := w.getSealingBlock(args.Parent, args.Timestamp, args.FeeRecipient, args.Random, false)
if err == nil {
payload.update(block, fees)
}
timer.Reset(w.recommit)
case <-payload.stop:
return
case <-endTimer.C:
return
}
}
}()
return payload, nil
}

View File

@ -0,0 +1,80 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>
package miner
import (
"reflect"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/params"
)
func TestBuildPayload(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
recipient = common.HexToAddress("0xdeadbeef")
)
w, b := newTestWorker(t, params.TestChainConfig, ethash.NewFaker(), db, 0)
defer w.close()
timestamp := uint64(time.Now().Unix())
args := &BuildPayloadArgs{
Parent: b.chain.CurrentBlock().Hash(),
Timestamp: timestamp,
Random: common.Hash{},
FeeRecipient: recipient,
}
payload, err := w.buildPayload(args)
if err != nil {
t.Fatalf("Failed to build payload %v", err)
}
verify := func(data *beacon.ExecutableDataV1, txs int) {
if data.ParentHash != b.chain.CurrentBlock().Hash() {
t.Fatal("Unexpect parent hash")
}
if data.Random != (common.Hash{}) {
t.Fatal("Unexpect random value")
}
if data.Timestamp != timestamp {
t.Fatal("Unexpect timestamp")
}
if data.FeeRecipient != recipient {
t.Fatal("Unexpect fee recipient")
}
if len(data.Transactions) != txs {
t.Fatal("Unexpect transaction set")
}
}
empty := payload.ResolveEmpty()
verify(empty, 0)
full := payload.ResolveFull()
verify(full, len(pendingTxs))
// Ensure resolve can be called multiple times and the
// result should be unchanged
dataOne := payload.Resolve()
dataTwo := payload.Resolve()
if !reflect.DeepEqual(dataOne, dataTwo) {
t.Fatal("Unexpected payload data")
}
}

View File

@ -81,8 +81,8 @@ var (
transitionDifficulty = new(big.Int).Mul(big.NewInt(20), params.MinimumDifficulty) transitionDifficulty = new(big.Int).Mul(big.NewInt(20), params.MinimumDifficulty)
// blockInterval is the time interval for creating a new eth2 block // blockInterval is the time interval for creating a new eth2 block
blockInterval = time.Second * 3
blockIntervalInt = 3 blockIntervalInt = 3
blockInterval = time.Second * time.Duration(blockIntervalInt)
// finalizationDist is the block distance for finalizing block // finalizationDist is the block distance for finalizing block
finalizationDist = 10 finalizationDist = 10
@ -164,6 +164,7 @@ func (n *ethNode) assembleBlock(parentHash common.Hash, parentTimestamp uint64)
if err != nil { if err != nil {
return nil, err return nil, err
} }
time.Sleep(time.Second * 5) // give enough time for block creation
return n.api.GetPayloadV1(*payload.PayloadID) return n.api.GetPayloadV1(*payload.PayloadID)
} }
@ -316,17 +317,14 @@ func (mgr *nodeManager) run() {
} }
nodes := mgr.getNodes(eth2MiningNode) nodes := mgr.getNodes(eth2MiningNode)
nodes = append(nodes, mgr.getNodes(eth2NormalNode)...) nodes = append(nodes, mgr.getNodes(eth2NormalNode)...)
nodes = append(nodes, mgr.getNodes(eth2LightClient)...) //nodes = append(nodes, mgr.getNodes(eth2LightClient)...)
for _, node := range nodes { for _, node := range nodes {
fcState := beacon.ForkchoiceStateV1{ fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: oldest.Hash(), HeadBlockHash: parentBlock.Hash(),
SafeBlockHash: common.Hash{}, SafeBlockHash: oldest.Hash(),
FinalizedBlockHash: oldest.Hash(), FinalizedBlockHash: oldest.Hash(),
} }
// TODO(rjl493456442) finalization doesn't work properly, FIX IT node.api.ForkchoiceUpdatedV1(fcState, nil)
_ = fcState
_ = node
//node.api.ForkchoiceUpdatedV1(fcState, nil)
} }
log.Info("Finalised eth2 block", "number", oldest.NumberU64(), "hash", oldest.Hash()) log.Info("Finalised eth2 block", "number", oldest.NumberU64(), "hash", oldest.Hash())
waitFinalise = waitFinalise[1:] waitFinalise = waitFinalise[1:]
@ -423,7 +421,7 @@ func main() {
node := nodes[index%len(nodes)] node := nodes[index%len(nodes)]
// Create a self transaction and inject into the pool // Create a self transaction and inject into the pool
tx, err := types.SignTx(types.NewTransaction(nonces[index], crypto.PubkeyToAddress(faucets[index].PublicKey), new(big.Int), 21000, big.NewInt(100000000000+rand.Int63n(65536)), nil), types.HomesteadSigner{}, faucets[index]) tx, err := types.SignTx(types.NewTransaction(nonces[index], crypto.PubkeyToAddress(faucets[index].PublicKey), new(big.Int), 21000, big.NewInt(10_000_000_000+rand.Int63n(6_553_600_000)), nil), types.HomesteadSigner{}, faucets[index])
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -492,7 +490,7 @@ func makeFullNode(genesis *core.Genesis) (*node.Node, *eth.Ethereum, *ethcatalys
GasFloor: genesis.GasLimit * 9 / 10, GasFloor: genesis.GasLimit * 9 / 10,
GasCeil: genesis.GasLimit * 11 / 10, GasCeil: genesis.GasLimit * 11 / 10,
GasPrice: big.NewInt(1), GasPrice: big.NewInt(1),
Recommit: 10 * time.Second, // Disable the recommit Recommit: 1 * time.Second,
}, },
LightServ: 100, LightServ: 100,
LightPeers: 10, LightPeers: 10,

View File

@ -169,11 +169,17 @@ type newWorkReq struct {
timestamp int64 timestamp int64
} }
// newPayloadResult represents a result struct corresponds to payload generation.
type newPayloadResult struct {
err error
block *types.Block
fees *big.Int
}
// getWorkReq represents a request for getting a new sealing work with provided parameters. // getWorkReq represents a request for getting a new sealing work with provided parameters.
type getWorkReq struct { type getWorkReq struct {
params *generateParams params *generateParams
result chan *types.Block // non-blocking channel result chan *newPayloadResult // non-blocking channel
err chan error
} }
// intervalAdjust represents a resubmitting interval adjustment. // intervalAdjust represents a resubmitting interval adjustment.
@ -250,6 +256,10 @@ type worker struct {
// in case there are some computation expensive transactions in txpool. // in case there are some computation expensive transactions in txpool.
newpayloadTimeout time.Duration newpayloadTimeout time.Duration
// recommit is the time interval to re-create sealing work or to re-build
// payload in proof-of-stake stage.
recommit time.Duration
// External functions // External functions
isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner. isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner.
@ -297,6 +307,8 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval) log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
recommit = minRecommitInterval recommit = minRecommitInterval
} }
worker.recommit = recommit
// Sanitize the timeout config for creating payload. // Sanitize the timeout config for creating payload.
newpayloadTimeout := worker.config.NewPayloadTimeout newpayloadTimeout := worker.config.NewPayloadTimeout
if newpayloadTimeout == 0 { if newpayloadTimeout == 0 {
@ -553,13 +565,11 @@ func (w *worker) mainLoop() {
w.commitWork(req.interrupt, req.noempty, req.timestamp) w.commitWork(req.interrupt, req.noempty, req.timestamp)
case req := <-w.getWorkCh: case req := <-w.getWorkCh:
block, err := w.generateWork(req.params) block, fees, err := w.generateWork(req.params)
if err != nil { req.result <- &newPayloadResult{
req.err <- err err: err,
req.result <- nil block: block,
} else { fees: fees,
req.err <- nil
req.result <- block
} }
case ev := <-w.chainSideCh: case ev := <-w.chainSideCh:
// Short circuit for duplicate side blocks // Short circuit for duplicate side blocks
@ -1071,10 +1081,10 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) error {
} }
// generateWork generates a sealing block based on the given parameters. // generateWork generates a sealing block based on the given parameters.
func (w *worker) generateWork(params *generateParams) (*types.Block, error) { func (w *worker) generateWork(params *generateParams) (*types.Block, *big.Int, error) {
work, err := w.prepareWork(params) work, err := w.prepareWork(params)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
defer work.discard() defer work.discard()
@ -1090,7 +1100,11 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout)) log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout))
} }
} }
return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) block, err := w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
if err != nil {
return nil, nil, err
}
return block, totalFees(block, work.receipts), nil
} }
// commitWork generates several new sealing tasks based on the parent block // commitWork generates several new sealing tasks based on the parent block
@ -1180,9 +1194,12 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti
select { select {
case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now()}: case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now()}:
w.unconfirmed.Shift(block.NumberU64() - 1) w.unconfirmed.Shift(block.NumberU64() - 1)
fees := totalFees(block, env.receipts)
feesInEther := new(big.Float).Quo(new(big.Float).SetInt(fees), new(big.Float).SetInt(big.NewInt(params.Ether)))
log.Info("Commit new sealing work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), log.Info("Commit new sealing work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
"uncles", len(env.uncles), "txs", env.tcount, "uncles", len(env.uncles), "txs", env.tcount,
"gas", block.GasUsed(), "fees", totalFees(block, env.receipts), "gas", block.GasUsed(), "fees", feesInEther,
"elapsed", common.PrettyDuration(time.Since(start))) "elapsed", common.PrettyDuration(time.Since(start)))
case <-w.exitCh: case <-w.exitCh:
@ -1199,11 +1216,7 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti
// getSealingBlock generates the sealing block based on the given parameters. // getSealingBlock generates the sealing block based on the given parameters.
// The generation result will be passed back via the given channel no matter // The generation result will be passed back via the given channel no matter
// the generation itself succeeds or not. // the generation itself succeeds or not.
func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, chan error, error) { func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (*types.Block, *big.Int, error) {
var (
resCh = make(chan *types.Block, 1)
errCh = make(chan error, 1)
)
req := &getWorkReq{ req := &getWorkReq{
params: &generateParams{ params: &generateParams{
timestamp: timestamp, timestamp: timestamp,
@ -1215,12 +1228,15 @@ func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase
noExtra: true, noExtra: true,
noTxs: noTxs, noTxs: noTxs,
}, },
result: resCh, result: make(chan *newPayloadResult, 1),
err: errCh,
} }
select { select {
case w.getWorkCh <- req: case w.getWorkCh <- req:
return resCh, errCh, nil result := <-req.result
if result.err != nil {
return nil, nil, result.err
}
return result.block, result.fees, nil
case <-w.exitCh: case <-w.exitCh:
return nil, nil, errors.New("miner closed") return nil, nil, errors.New("miner closed")
} }
@ -1251,14 +1267,14 @@ func (w *worker) postSideBlock(event core.ChainSideEvent) {
} }
} }
// totalFees computes total consumed miner fees in ETH. Block transactions and receipts have to have the same order. // totalFees computes total consumed miner fees in Wei. Block transactions and receipts have to have the same order.
func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float { func totalFees(block *types.Block, receipts []*types.Receipt) *big.Int {
feesWei := new(big.Int) feesWei := new(big.Int)
for i, tx := range block.Transactions() { for i, tx := range block.Transactions() {
minerFee, _ := tx.EffectiveGasTip(block.BaseFee()) minerFee, _ := tx.EffectiveGasTip(block.BaseFee())
feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), minerFee)) feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), minerFee))
} }
return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether))) return feesWei
} }
// signalToErr converts the interruption signal to a concrete error type for return. // signalToErr converts the interruption signal to a concrete error type for return.

View File

@ -634,9 +634,7 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co
// This API should work even when the automatic sealing is not enabled // This API should work even when the automatic sealing is not enabled
for _, c := range cases { for _, c := range cases {
resChan, errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false) block, _, err := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false)
block := <-resChan
err := <-errChan
if c.expectErr { if c.expectErr {
if err == nil { if err == nil {
t.Error("Expect error but get nil") t.Error("Expect error but get nil")
@ -652,9 +650,7 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co
// This API should work even when the automatic sealing is enabled // This API should work even when the automatic sealing is enabled
w.start() w.start()
for _, c := range cases { for _, c := range cases {
resChan, errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false) block, _, err := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false)
block := <-resChan
err := <-errChan
if c.expectErr { if c.expectErr {
if err == nil { if err == nil {
t.Error("Expect error but get nil") t.Error("Expect error but get nil")