Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
4 changed files with 283 additions and 85 deletions
Showing only changes of commit 4a4d531052 - Show all commits

View File

@ -43,9 +43,10 @@ var (
INVALIDBLOCKHASH = "INVALID_BLOCK_HASH"
GenericServerError = rpc.CustomError{Code: -32000, ValidationError: "Server error"}
UnknownPayload = rpc.CustomError{Code: -32001, ValidationError: "Unknown payload"}
InvalidTB = rpc.CustomError{Code: -32002, ValidationError: "Invalid terminal block"}
GenericServerError = rpc.CustomError{Code: -32000, ValidationError: "Server error"}
UnknownPayload = rpc.CustomError{Code: -38001, ValidationError: "Unknown payload"}
InvalidForkChoiceState = rpc.CustomError{Code: -38002, ValidationError: "Invalid forkchoice state"}
InvalidPayloadAttributes = rpc.CustomError{Code: -38003, ValidationError: "Invalid payload attributes"}
STATUS_INVALID = ForkChoiceResponse{PayloadStatus: PayloadStatusV1{Status: INVALID}, PayloadID: nil}
STATUS_SYNCING = ForkChoiceResponse{PayloadStatus: PayloadStatusV1{Status: SYNCING}, PayloadID: nil}

View File

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
@ -165,10 +166,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
finalBlock := api.eth.BlockChain().GetBlockByHash(update.FinalizedBlockHash)
if finalBlock == nil {
log.Warn("Final block not available in database", "hash", update.FinalizedBlockHash)
return beacon.STATUS_INVALID, errors.New("final block not available")
return beacon.STATUS_INVALID, &beacon.InvalidForkChoiceState
} else if rawdb.ReadCanonicalHash(api.eth.ChainDb(), finalBlock.NumberU64()) != update.FinalizedBlockHash {
log.Warn("Final block not in canonical chain", "number", block.NumberU64(), "hash", update.HeadBlockHash)
return beacon.STATUS_INVALID, errors.New("final block not canonical")
return beacon.STATUS_INVALID, &beacon.InvalidForkChoiceState
}
// Set the finalized block
api.eth.BlockChain().SetFinalized(finalBlock)
@ -178,11 +179,11 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
safeBlock := api.eth.BlockChain().GetBlockByHash(update.SafeBlockHash)
if safeBlock == nil {
log.Warn("Safe block not available in database")
return beacon.STATUS_INVALID, errors.New("safe head not available")
return beacon.STATUS_INVALID, &beacon.InvalidForkChoiceState
}
if rawdb.ReadCanonicalHash(api.eth.ChainDb(), safeBlock.NumberU64()) != update.SafeBlockHash {
log.Warn("Safe block not in canonical chain")
return beacon.STATUS_INVALID, errors.New("safe head not canonical")
return beacon.STATUS_INVALID, &beacon.InvalidForkChoiceState
}
}
@ -200,13 +201,15 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
// Create an empty block first which can be used as a fallback
empty, err := api.eth.Miner().GetSealingBlockSync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, true)
if err != nil {
return valid(nil), err
log.Error("Failed to create empty sealing payload", "err", err)
return valid(nil), &beacon.InvalidPayloadAttributes
}
// Send a request to generate a full block in the background.
// The result can be obtained via the returned channel.
resCh, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false)
if err != nil {
return valid(nil), err
log.Error("Failed to create async sealing payload", "err", err)
return valid(nil), &beacon.InvalidPayloadAttributes
}
id := computePayloadId(update.HeadBlockHash, payloadAttributes)
api.localBlocks.put(id, &payload{empty: empty, result: resCh})
@ -303,7 +306,7 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
}
if block.Time() <= parent.Time() {
log.Warn("Invalid timestamp", "parent", block.Time(), "block", block.Time())
return api.invalid(errors.New("invalid timestamp")), nil
return api.invalid(errors.New("invalid timestamp"), parent), nil
}
if !api.eth.BlockChain().HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
api.remoteBlocks.put(block.Hash(), block.Header())
@ -313,7 +316,7 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
log.Trace("Inserting block without sethead", "hash", block.Hash(), "number", block.Number)
if err := api.eth.BlockChain().InsertBlockWithoutSetHead(block); err != nil {
log.Warn("NewPayloadV1: inserting block failed", "error", err)
return api.invalid(err), nil
return api.invalid(err, parent), nil
}
// We've accepted a valid payload from the beacon client. Mark the local
// chain transitions to notify other subsystems (e.g. downloader) of the
@ -339,9 +342,13 @@ func computePayloadId(headBlockHash common.Hash, params *beacon.PayloadAttribute
return out
}
// invalid returns a response "INVALID" with the latest valid hash set to the current head.
func (api *ConsensusAPI) invalid(err error) beacon.PayloadStatusV1 {
currentHash := api.eth.BlockChain().CurrentHeader().Hash()
// invalid returns a response "INVALID" with the latest valid hash supplied by latest or to the current head
// if no latestValid block was provided.
func (api *ConsensusAPI) invalid(err error, latestValid *types.Block) beacon.PayloadStatusV1 {
currentHash := api.eth.BlockChain().CurrentBlock().Hash()
if latestValid != nil {
currentHash = latestValid.Hash()
}
errorMsg := err.Error()
return beacon.PayloadStatusV1{Status: beacon.INVALID, LatestValidHash: &currentHash, ValidationError: &errorMsg}
}

View File

@ -17,9 +17,9 @@
package catalyst
import (
"bytes"
"fmt"
"math/big"
"os"
"testing"
"time"
@ -32,10 +32,12 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)
var (
@ -142,47 +144,44 @@ func TestSetHeadBeforeTotalDifficulty(t *testing.T) {
}
func TestEth2PrepareAndGetPayload(t *testing.T) {
// TODO (MariusVanDerWijden) TestEth2PrepareAndGetPayload is currently broken, fixed in upcoming merge-kiln-v2 pr
/*
genesis, blocks := generatePreMergeChain(10)
// We need to properly set the terminal total difficulty
genesis.Config.TerminalTotalDifficulty.Sub(genesis.Config.TerminalTotalDifficulty, blocks[9].Difficulty())
n, ethservice := startEthService(t, genesis, blocks[:9])
defer n.Close()
genesis, blocks := generatePreMergeChain(10)
// We need to properly set the terminal total difficulty
genesis.Config.TerminalTotalDifficulty.Sub(genesis.Config.TerminalTotalDifficulty, blocks[9].Difficulty())
n, ethservice := startEthService(t, genesis, blocks[:9])
defer n.Close()
api := NewConsensusAPI(ethservice)
api := NewConsensusAPI(ethservice)
// Put the 10th block's tx in the pool and produce a new block
api.insertTransactions(blocks[9].Transactions())
blockParams := beacon.PayloadAttributesV1{
Timestamp: blocks[8].Time() + 5,
}
fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: blocks[8].Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
_, err := api.ForkchoiceUpdatedV1(fcState, &blockParams)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
payloadID := computePayloadId(fcState.HeadBlockHash, &blockParams)
execData, err := api.GetPayloadV1(payloadID)
if err != nil {
t.Fatalf("error getting payload, err=%v", err)
}
if len(execData.Transactions) != blocks[9].Transactions().Len() {
t.Fatalf("invalid number of transactions %d != 1", len(execData.Transactions))
}
// Test invalid payloadID
var invPayload beacon.PayloadID
copy(invPayload[:], payloadID[:])
invPayload[0] = ^invPayload[0]
_, err = api.GetPayloadV1(invPayload)
if err == nil {
t.Fatal("expected error retrieving invalid payload")
}
*/
// Put the 10th block's tx in the pool and produce a new block
ethservice.TxPool().AddLocals(blocks[9].Transactions())
blockParams := beacon.PayloadAttributesV1{
Timestamp: blocks[8].Time() + 5,
}
fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: blocks[8].Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
_, err := api.ForkchoiceUpdatedV1(fcState, &blockParams)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
payloadID := computePayloadId(fcState.HeadBlockHash, &blockParams)
execData, err := api.GetPayloadV1(payloadID)
if err != nil {
t.Fatalf("error getting payload, err=%v", err)
}
if len(execData.Transactions) != blocks[9].Transactions().Len() {
t.Fatalf("invalid number of transactions %d != 1", len(execData.Transactions))
}
// Test invalid payloadID
var invPayload beacon.PayloadID
copy(invPayload[:], payloadID[:])
invPayload[0] = ^invPayload[0]
_, err = api.GetPayloadV1(invPayload)
if err == nil {
t.Fatal("expected error retrieving invalid payload")
}
}
func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan core.RemovedLogsEvent, wantNew, wantRemoved int) {
@ -396,13 +395,17 @@ func TestEth2DeepReorg(t *testing.T) {
func startEthService(t *testing.T, genesis *core.Genesis, blocks []*types.Block) (*node.Node, *eth.Ethereum) {
t.Helper()
// Disable verbose log output which is noise to some extent.
log.Root().SetHandler(log.LvlFilterHandler(log.LvlCrit, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
n, err := node.New(&node.Config{})
n, err := node.New(&node.Config{
P2P: p2p.Config{
ListenAddr: "0.0.0.0:0",
NoDiscovery: true,
MaxPeers: 25,
}})
if err != nil {
t.Fatal("can't create node:", err)
}
ethcfg := &ethconfig.Config{Genesis: genesis, Ethash: ethash.Config{PowMode: ethash.ModeFake}, TrieTimeout: time.Minute, TrieDirtyCache: 256, TrieCleanCache: 256}
ethcfg := &ethconfig.Config{Genesis: genesis, Ethash: ethash.Config{PowMode: ethash.ModeFake}, SyncMode: downloader.SnapSync, TrieTimeout: time.Minute, TrieDirtyCache: 256, TrieCleanCache: 256}
ethservice, err := eth.New(n, ethcfg)
if err != nil {
t.Fatal("can't create eth service:", err)
@ -427,39 +430,28 @@ func TestFullAPI(t *testing.T) {
ethservice.Merger().ReachTTD()
defer n.Close()
var (
api = NewConsensusAPI(ethservice)
parent = ethservice.BlockChain().CurrentBlock()
// This EVM code generates a log when the contract is created.
logCode = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
)
for i := 0; i < 10; i++ {
callback := func(parent *types.Block) {
statedb, _ := ethservice.BlockChain().StateAt(parent.Root())
nonce := statedb.GetNonce(testAddr)
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().AddLocal(tx)
}
params := beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 1,
Random: crypto.Keccak256Hash([]byte{byte(i)}),
SuggestedFeeRecipient: parent.Coinbase(),
}
setupBlocks(t, ethservice, 10, parent, callback)
}
func setupBlocks(t *testing.T, ethservice *eth.Ethereum, n int, parent *types.Block, callback func(parent *types.Block)) {
api := NewConsensusAPI(ethservice)
for i := 0; i < n; i++ {
callback(parent)
payload := getNewPayload(t, api, parent)
fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
resp, err := api.ForkchoiceUpdatedV1(fcState, &params)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
if resp.PayloadStatus.Status != beacon.VALID {
t.Fatalf("error preparing payload, invalid status: %v", resp.PayloadStatus.Status)
}
payload, err := api.GetPayloadV1(*resp.PayloadID)
if err != nil {
t.Fatalf("can't get payload: %v", err)
}
execResp, err := api.NewPayloadV1(*payload)
if err != nil {
t.Fatalf("can't execute payload: %v", err)
@ -467,7 +459,7 @@ func TestFullAPI(t *testing.T) {
if execResp.Status != beacon.VALID {
t.Fatalf("invalid status: %v", execResp.Status)
}
fcState = beacon.ForkchoiceStateV1{
fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: payload.ParentHash,
FinalizedBlockHash: payload.ParentHash,
@ -531,11 +523,29 @@ func TestExchangeTransitionConfig(t *testing.T) {
}
}
func TestEmptyBlocks(t *testing.T) {
/*
TestNewPayloadOnInvalidChain sets up a valid chain and tries to feed blocks
from an invalid chain to test if latestValidHash (LVH) works correctly.
We set up the following chain where P1 ... Pn and P1'' are valid while
P1' is invalid.
We expect
(1) The LVH to point to the current inserted payload if it was valid.
(2) The LVH to point to the valid parent on an invalid payload (if the parent is available).
(3) If the parent is unavailable, the LVH should not be set.
CommonAncestor P1 P2 P3 ... Pn
P1' P2' P3' ... Pn'
P1''
*/
func TestNewPayloadOnInvalidChain(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
ethservice.Merger().ReachTTD()
defer n.Close()
var (
api = NewConsensusAPI(ethservice)
parent = ethservice.BlockChain().CurrentBlock()
@ -604,3 +614,183 @@ func assembleBlock(api *ConsensusAPI, parentHash common.Hash, params *beacon.Pay
}
return beacon.BlockToExecutableData(block), nil
}
func TestEmptyBlocks(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
ethservice.Merger().ReachTTD()
defer n.Close()
commonAncestor := ethservice.BlockChain().CurrentBlock()
api := NewConsensusAPI(ethservice)
// Setup 10 blocks on the canonical chain
setupBlocks(t, ethservice, 10, commonAncestor, func(parent *types.Block) {})
// (1) check LatestValidHash by sending a normal payload (P1'')
payload := getNewPayload(t, api, commonAncestor)
status, err := api.NewPayloadV1(*payload)
if err != nil {
t.Fatal(err)
}
if status.Status != beacon.VALID {
t.Errorf("invalid status: expected VALID got: %v", status.Status)
}
if !bytes.Equal(status.LatestValidHash[:], payload.BlockHash[:]) {
t.Fatalf("invalid LVH: got %v want %v", status.LatestValidHash, payload.BlockHash)
}
// (2) Now send P1' which is invalid
payload = getNewPayload(t, api, commonAncestor)
payload.GasUsed += 1
payload = setBlockhash(payload)
// Now latestValidHash should be the common ancestor
status, err = api.NewPayloadV1(*payload)
if err != nil {
t.Fatal(err)
}
if status.Status != beacon.INVALID {
t.Errorf("invalid status: expected INVALID got: %v", status.Status)
}
expected := commonAncestor.Hash()
if !bytes.Equal(status.LatestValidHash[:], expected[:]) {
t.Fatalf("invalid LVH: got %v want %v", status.LatestValidHash, expected)
}
// (3) Now send a payload with unknown parent
payload = getNewPayload(t, api, commonAncestor)
payload.ParentHash = common.Hash{1}
payload = setBlockhash(payload)
// Now latestValidHash should be the common ancestor
status, err = api.NewPayloadV1(*payload)
if err != nil {
t.Fatal(err)
}
if status.Status != beacon.ACCEPTED {
t.Errorf("invalid status: expected ACCEPTED got: %v", status.Status)
}
if status.LatestValidHash != nil {
t.Fatalf("invalid LVH: got %v wanted nil", status.LatestValidHash)
}
}
func getNewPayload(t *testing.T, api *ConsensusAPI, parent *types.Block) *beacon.ExecutableDataV1 {
params := beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 1,
Random: crypto.Keccak256Hash([]byte{byte(1)}),
SuggestedFeeRecipient: parent.Coinbase(),
}
payload, err := assembleBlock(api, parent.Hash(), &params)
if err != nil {
t.Fatal(err)
}
return payload
}
// setBlockhash sets the blockhash of a modified ExecutableData.
// Can be used to make modified payloads look valid.
func setBlockhash(data *beacon.ExecutableDataV1) *beacon.ExecutableDataV1 {
txs, _ := decodeTransactions(data.Transactions)
number := big.NewInt(0)
number.SetUint64(data.Number)
header := &types.Header{
ParentHash: data.ParentHash,
UncleHash: types.EmptyUncleHash,
Coinbase: data.FeeRecipient,
Root: data.StateRoot,
TxHash: types.DeriveSha(types.Transactions(txs), trie.NewStackTrie(nil)),
ReceiptHash: data.ReceiptsRoot,
Bloom: types.BytesToBloom(data.LogsBloom),
Difficulty: common.Big0,
Number: number,
GasLimit: data.GasLimit,
GasUsed: data.GasUsed,
Time: data.Timestamp,
BaseFee: data.BaseFeePerGas,
Extra: data.ExtraData,
MixDigest: data.Random,
}
block := types.NewBlockWithHeader(header).WithBody(txs, nil /* uncles */)
data.BlockHash = block.Hash()
return data
}
func decodeTransactions(enc [][]byte) ([]*types.Transaction, error) {
var txs = make([]*types.Transaction, len(enc))
for i, encTx := range enc {
var tx types.Transaction
if err := tx.UnmarshalBinary(encTx); err != nil {
return nil, fmt.Errorf("invalid transaction %d: %v", i, err)
}
txs[i] = &tx
}
return txs, nil
}
func TestTrickRemoteBlockCache(t *testing.T) {
// Setup two nodes
genesis, preMergeBlocks := generatePreMergeChain(10)
nodeA, ethserviceA := startEthService(t, genesis, preMergeBlocks)
nodeB, ethserviceB := startEthService(t, genesis, preMergeBlocks)
ethserviceA.Merger().ReachTTD()
ethserviceB.Merger().ReachTTD()
defer nodeA.Close()
defer nodeB.Close()
for nodeB.Server().NodeInfo().Ports.Listener == 0 {
time.Sleep(250 * time.Millisecond)
}
nodeA.Server().AddPeer(nodeB.Server().Self())
nodeB.Server().AddPeer(nodeA.Server().Self())
apiA := NewConsensusAPI(ethserviceA)
apiB := NewConsensusAPI(ethserviceB)
commonAncestor := ethserviceA.BlockChain().CurrentBlock()
// Setup 10 blocks on the canonical chain
setupBlocks(t, ethserviceA, 10, commonAncestor, func(parent *types.Block) {})
commonAncestor = ethserviceA.BlockChain().CurrentBlock()
var invalidChain []*beacon.ExecutableDataV1
// create a valid payload (P1)
//payload1 := getNewPayload(t, apiA, commonAncestor)
//invalidChain = append(invalidChain, payload1)
// create an invalid payload2 (P2)
payload2 := getNewPayload(t, apiA, commonAncestor)
//payload2.ParentHash = payload1.BlockHash
payload2.GasUsed += 1
payload2 = setBlockhash(payload2)
invalidChain = append(invalidChain, payload2)
head := payload2
// create some valid payloads on top
for i := 0; i < 10; i++ {
payload := getNewPayload(t, apiA, commonAncestor)
payload.ParentHash = head.BlockHash
payload = setBlockhash(payload)
invalidChain = append(invalidChain, payload)
head = payload
}
// feed the payloads to node B
for _, payload := range invalidChain {
status, err := apiB.NewPayloadV1(*payload)
if err != nil {
panic(err)
}
if status.Status == beacon.INVALID {
panic("success")
}
// Now reorg to the head of the invalid chain
resp, err := apiB.ForkchoiceUpdatedV1(beacon.ForkchoiceStateV1{HeadBlockHash: payload.BlockHash, SafeBlockHash: payload.BlockHash, FinalizedBlockHash: payload.ParentHash}, nil)
if err != nil {
t.Fatal(err)
}
if resp.PayloadStatus.Status == beacon.VALID {
t.Errorf("invalid status: expected INVALID got: %v", resp.PayloadStatus.Status)
}
time.Sleep(100 * time.Millisecond)
}
}

View File

@ -161,7 +161,7 @@ func (api *ConsensusAPI) checkTerminalTotalDifficulty(head common.Hash) error {
}
td := api.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
if td != nil && td.Cmp(api.les.BlockChain().Config().TerminalTotalDifficulty) < 0 {
return &beacon.InvalidTB
return errors.New("invalid ttd")
}
return nil
}