diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 754d8b214..2756a02e2 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -120,6 +120,7 @@ type ConsensusAPI struct { lastNewPayloadLock sync.Mutex forkchoiceLock sync.Mutex // Lock for the forkChoiceUpdated method + newPayloadLock sync.Mutex // Lock for the NewPayload method } // NewConsensusAPI creates a new consensus api for the given backend. @@ -342,6 +343,22 @@ func (api *ConsensusAPI) GetPayloadV1(payloadID beacon.PayloadID) (*beacon.Execu // NewPayloadV1 creates an Eth1 block, inserts it in the chain, and returns the status of the chain. func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.PayloadStatusV1, error) { + // The locking here is, strictly, not required. Without these locks, this can happen: + // + // 1. NewPayload( execdata-N ) is invoked from the CL. It goes all the way down to + // api.eth.BlockChain().InsertBlockWithoutSetHead, where it is blocked on + // e.g database compaction. + // 2. The call times out on the CL layer, which issues another NewPayload (execdata-N) call. + // Similarly, this also get stuck on the same place. Importantly, since the + // first call has not gone through, the early checks for "do we already have this block" + // will all return false. + // 3. When the db compaction ends, then N calls inserting the same payload are processed + // sequentially. + // Hence, we use a lock here, to be sure that the previous call has finished before we + // check whether we already have the block locally. + api.newPayloadLock.Lock() + defer api.newPayloadLock.Unlock() + log.Trace("Engine API request received", "method", "ExecutePayload", "number", params.Number, "hash", params.BlockHash) block, err := beacon.ExecutableDataToBlock(params) if err != nil { diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index ae53462ff..ec26f35e8 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "math/big" + "sync" "testing" "time" @@ -277,10 +278,12 @@ func TestEth2NewBlock(t *testing.T) { t.Fatalf("Failed to convert executable data to block %v", err) } newResp, err := api.NewPayloadV1(*execData) - if err != nil || newResp.Status != "VALID" { + switch { + case err != nil: t.Fatalf("Failed to insert block: %v", err) - } - if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64()-1 { + case newResp.Status != "VALID": + t.Fatalf("Failed to insert block: %v", newResp.Status) + case ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64()-1: t.Fatalf("Chain head shouldn't be updated") } checkLogEvents(t, newLogCh, rmLogsCh, 0, 0) @@ -292,8 +295,8 @@ func TestEth2NewBlock(t *testing.T) { if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { t.Fatalf("Failed to insert block: %v", err) } - if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64() { - t.Fatalf("Chain head should be updated") + if have, want := ethservice.BlockChain().CurrentBlock().NumberU64(), block.NumberU64(); have != want { + t.Fatalf("Chain head should be updated, have %d want %d", have, want) } checkLogEvents(t, newLogCh, rmLogsCh, 1, 0) @@ -855,3 +858,102 @@ func TestNewPayloadOnInvalidTerminalBlock(t *testing.T) { t.Fatalf("error sending invalid forkchoice, invalid status: %v", resp.PayloadStatus.Status) } } + +// TestSimultaneousNewBlock does several parallel inserts, both as +// newPayLoad and forkchoiceUpdate. This is to test that the api behaves +// well even of the caller is not being 'serial'. +func TestSimultaneousNewBlock(t *testing.T) { + genesis, preMergeBlocks := generatePreMergeChain(10) + n, ethservice := startEthService(t, genesis, preMergeBlocks) + defer n.Close() + + var ( + api = NewConsensusAPI(ethservice) + parent = preMergeBlocks[len(preMergeBlocks)-1] + ) + for i := 0; i < 10; i++ { + statedb, _ := ethservice.BlockChain().StateAt(parent.Root()) + ethservice.TxPool().AddLocal(types.MustSignNewTx(testKey, types.LatestSigner(ethservice.BlockChain().Config()), + &types.DynamicFeeTx{ + Nonce: statedb.GetNonce(testAddr), + Value: big.NewInt(0), + GasFeeCap: big.NewInt(2 * params.InitialBaseFee), + GasTipCap: big.NewInt(2 * params.InitialBaseFee), + ChainID: genesis.Config.ChainID, + Gas: 1000000, + To: &common.Address{99}, + })) + execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{ + Timestamp: parent.Time() + 5, + }) + if err != nil { + t.Fatalf("Failed to create the executable data %v", err) + } + // Insert it 10 times in parallel. Should be ignored. + { + var ( + wg sync.WaitGroup + testErr error + errMu sync.Mutex + ) + wg.Add(10) + for ii := 0; ii < 10; ii++ { + go func() { + defer wg.Done() + if newResp, err := api.NewPayloadV1(*execData); err != nil { + errMu.Lock() + testErr = fmt.Errorf("Failed to insert block: %w", err) + errMu.Unlock() + } else if newResp.Status != "VALID" { + errMu.Lock() + testErr = fmt.Errorf("Failed to insert block: %v", newResp.Status) + errMu.Unlock() + } + }() + } + wg.Wait() + if testErr != nil { + t.Fatal(testErr) + } + } + block, err := beacon.ExecutableDataToBlock(*execData) + if err != nil { + t.Fatalf("Failed to convert executable data to block %v", err) + } + if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64()-1 { + t.Fatalf("Chain head shouldn't be updated") + } + fcState := beacon.ForkchoiceStateV1{ + HeadBlockHash: block.Hash(), + SafeBlockHash: block.Hash(), + FinalizedBlockHash: block.Hash(), + } + { + var ( + wg sync.WaitGroup + testErr error + errMu sync.Mutex + ) + wg.Add(10) + // Do each FCU 10 times + for ii := 0; ii < 10; ii++ { + go func() { + defer wg.Done() + if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { + errMu.Lock() + testErr = fmt.Errorf("Failed to insert block: %w", err) + errMu.Unlock() + } + }() + } + wg.Wait() + if testErr != nil { + t.Fatal(testErr) + } + } + if have, want := ethservice.BlockChain().CurrentBlock().NumberU64(), block.NumberU64(); have != want { + t.Fatalf("Chain head should be updated, have %d want %d", have, want) + } + parent = block + } +}