Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
4 changed files with 53 additions and 21 deletions
Showing only changes of commit ca948b8579 - Show all commits

View File

@ -18,8 +18,6 @@
package catalyst package catalyst
import ( import (
"crypto/sha256"
"encoding/binary"
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
@ -288,12 +286,17 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
FeeRecipient: payloadAttributes.SuggestedFeeRecipient, FeeRecipient: payloadAttributes.SuggestedFeeRecipient,
Random: payloadAttributes.Random, Random: payloadAttributes.Random,
} }
id := args.Id()
// If we already are busy generating this work, then we do not need
// to start a second process.
if api.localBlocks.has(id) {
return valid(&id), nil
}
payload, err := api.eth.Miner().BuildPayload(args) payload, err := api.eth.Miner().BuildPayload(args)
if err != nil { if err != nil {
log.Error("Failed to build payload", "err", err) log.Error("Failed to build payload", "err", err)
return valid(nil), beacon.InvalidPayloadAttributes.With(err) return valid(nil), beacon.InvalidPayloadAttributes.With(err)
} }
id := computePayloadId(update.HeadBlockHash, payloadAttributes)
api.localBlocks.put(id, payload) api.localBlocks.put(id, payload)
return valid(&id), nil return valid(&id), nil
} }
@ -443,19 +446,6 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
return beacon.PayloadStatusV1{Status: beacon.VALID, LatestValidHash: &hash}, nil return beacon.PayloadStatusV1{Status: beacon.VALID, LatestValidHash: &hash}, nil
} }
// computePayloadId computes a pseudo-random payloadid, based on the parameters.
func computePayloadId(headBlockHash common.Hash, params *beacon.PayloadAttributesV1) beacon.PayloadID {
// Hash
hasher := sha256.New()
hasher.Write(headBlockHash[:])
binary.Write(hasher, binary.BigEndian, params.Timestamp)
hasher.Write(params.Random[:])
hasher.Write(params.SuggestedFeeRecipient[:])
var out beacon.PayloadID
copy(out[:], hasher.Sum(nil)[:8])
return out
}
// delayPayloadImport stashes the given block away for import at a later time, // delayPayloadImport stashes the given block away for import at a later time,
// either via a forkchoice update or a sync extension. This method is meant to // either via a forkchoice update or a sync extension. This method is meant to
// be called by the newpayload command when the block seems to be ok, but some // be called by the newpayload command when the block seems to be ok, but some

View File

@ -184,7 +184,12 @@ func TestEth2PrepareAndGetPayload(t *testing.T) {
} }
// give the payload some time to be built // give the payload some time to be built
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
payloadID := computePayloadId(fcState.HeadBlockHash, &blockParams) payloadID := (&miner.BuildPayloadArgs{
Parent: fcState.HeadBlockHash,
Timestamp: blockParams.Timestamp,
FeeRecipient: blockParams.SuggestedFeeRecipient,
Random: blockParams.Random,
}).Id()
execData, err := api.GetPayloadV1(payloadID) execData, err := api.GetPayloadV1(payloadID)
if err != nil { if err != nil {
t.Fatalf("error getting payload, err=%v", err) t.Fatalf("error getting payload, err=%v", err)

View File

@ -85,6 +85,22 @@ func (q *payloadQueue) get(id beacon.PayloadID) *beacon.ExecutableDataV1 {
return nil return nil
} }
// has checks if a particular payload is already tracked.
func (q *payloadQueue) has(id beacon.PayloadID) bool {
q.lock.RLock()
defer q.lock.RUnlock()
for _, item := range q.payloads {
if item == nil {
return false
}
if item.id == id {
return true
}
}
return false
}
// headerQueueItem represents an hash->header tuple to store until it's retrieved // headerQueueItem represents an hash->header tuple to store until it's retrieved
// or evicted. // or evicted.
type headerQueueItem struct { type headerQueueItem struct {

View File

@ -17,6 +17,8 @@
package miner package miner
import ( import (
"crypto/sha256"
"encoding/binary"
"math/big" "math/big"
"sync" "sync"
"time" "time"
@ -38,12 +40,26 @@ type BuildPayloadArgs struct {
Random common.Hash // The provided randomness value Random common.Hash // The provided randomness value
} }
// Id computes an 8-byte identifier by hashing the components of the payload arguments.
func (args *BuildPayloadArgs) Id() beacon.PayloadID {
// Hash
hasher := sha256.New()
hasher.Write(args.Parent[:])
binary.Write(hasher, binary.BigEndian, args.Timestamp)
hasher.Write(args.Random[:])
hasher.Write(args.FeeRecipient[:])
var out beacon.PayloadID
copy(out[:], hasher.Sum(nil)[:8])
return out
}
// Payload wraps the built payload(block waiting for sealing). According to the // Payload wraps the built payload(block waiting for sealing). According to the
// engine-api specification, EL should build the initial version of the payload // engine-api specification, EL should build the initial version of the payload
// which has an empty transaction set and then keep update it in order to maximize // which has an empty transaction set and then keep update it in order to maximize
// the revenue. Therefore, the empty-block here is always available and full-block // the revenue. Therefore, the empty-block here is always available and full-block
// will be set/updated afterwards. // will be set/updated afterwards.
type Payload struct { type Payload struct {
id beacon.PayloadID
empty *types.Block empty *types.Block
full *types.Block full *types.Block
fullFees *big.Int fullFees *big.Int
@ -53,11 +69,13 @@ type Payload struct {
} }
// newPayload initializes the payload object. // newPayload initializes the payload object.
func newPayload(empty *types.Block) *Payload { func newPayload(empty *types.Block, id beacon.PayloadID) *Payload {
payload := &Payload{ payload := &Payload{
id: id,
empty: empty, empty: empty,
stop: make(chan struct{}), stop: make(chan struct{}),
} }
log.Info("Starting work on payload", "id", payload.id)
payload.cond = sync.NewCond(&payload.lock) payload.cond = sync.NewCond(&payload.lock)
return payload return payload
} }
@ -80,8 +98,9 @@ func (payload *Payload) update(block *types.Block, fees *big.Int, elapsed time.D
payload.fullFees = fees payload.fullFees = fees
feesInEther := new(big.Float).Quo(new(big.Float).SetInt(fees), big.NewFloat(params.Ether)) feesInEther := new(big.Float).Quo(new(big.Float).SetInt(fees), big.NewFloat(params.Ether))
log.Info("Updated payload", "number", block.NumberU64(), "hash", block.Hash(), log.Info("Updated payload", "id", payload.id, "number", block.NumberU64(), "hash", block.Hash(),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "fees", feesInEther, "elapsed", common.PrettyDuration(elapsed)) "txs", len(block.Transactions()), "gas", block.GasUsed(), "fees", feesInEther,
"root", block.Root(), "elapsed", common.PrettyDuration(elapsed))
} }
payload.cond.Broadcast() // fire signal for notifying full block payload.cond.Broadcast() // fire signal for notifying full block
} }
@ -139,7 +158,7 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
return nil, err return nil, err
} }
// Construct a payload object for return. // Construct a payload object for return.
payload := newPayload(empty) payload := newPayload(empty, args.Id())
// Spin up a routine for updating the payload in background. This strategy // Spin up a routine for updating the payload in background. This strategy
// can maximum the revenue for including transactions with highest fee. // can maximum the revenue for including transactions with highest fee.
@ -164,8 +183,10 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
} }
timer.Reset(w.recommit) timer.Reset(w.recommit)
case <-payload.stop: case <-payload.stop:
log.Info("Stopping work on payload", "id", payload.id, "reason", "delivery")
return return
case <-endTimer.C: case <-endTimer.C:
log.Info("Stopping work on payload", "id", payload.id, "reason", "timeout")
return return
} }
} }