eth: support bubbling up bad blocks from sync to the engine API (#25190)
* eth: support bubbling up bad blocks from sync to the engine API * eth/catalyst: fix typo Co-authored-by: Marius van der Wijden <m.vanderwijden@live.de> * eth/catalyst: fix typo Co-authored-by: Marius van der Wijden <m.vanderwijden@live.de> * Update eth/catalyst/api.go * eth/catalyst: when forgetting bad hashes, also forget descendants * eth/catalyst: minor bad block tweaks for resilience Co-authored-by: Marius van der Wijden <m.vanderwijden@live.de> Co-authored-by: Martin Holst Swende <martin@swende.se>
This commit is contained in:
parent
2b6a761238
commit
f3af3fd8df
@ -42,7 +42,7 @@ type payloadAttributesMarshaling struct {
|
||||
|
||||
//go:generate go run github.com/fjl/gencodec -type ExecutableDataV1 -field-override executableDataMarshaling -out gen_ed.go
|
||||
|
||||
// ExecutableDataV1 structure described at https://github.com/ethereum/execution-apis/src/engine/specification.md
|
||||
// ExecutableDataV1 structure described at https://github.com/ethereum/execution-apis/tree/main/src/engine/specification.md
|
||||
type ExecutableDataV1 struct {
|
||||
ParentHash common.Hash `json:"parentHash" gencodec:"required"`
|
||||
FeeRecipient common.Address `json:"feeRecipient" gencodec:"required"`
|
||||
|
@ -50,12 +50,47 @@ func Register(stack *node.Node, backend *eth.Ethereum) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
// invalidBlockHitEviction is the number of times an invalid block can be
|
||||
// referenced in forkchoice update or new payload before it is attempted
|
||||
// to be reprocessed again.
|
||||
invalidBlockHitEviction = 128
|
||||
|
||||
// invalidTipsetsCap is the max number of recent block hashes tracked that
|
||||
// have lead to some bad ancestor block. It's just an OOM protection.
|
||||
invalidTipsetsCap = 512
|
||||
)
|
||||
|
||||
type ConsensusAPI struct {
|
||||
eth *eth.Ethereum
|
||||
|
||||
remoteBlocks *headerQueue // Cache of remote payloads received
|
||||
localBlocks *payloadQueue // Cache of local payloads generated
|
||||
// Lock for the forkChoiceUpdated method
|
||||
forkChoiceLock sync.Mutex
|
||||
|
||||
// The forkchoice update and new payload method require us to return the
|
||||
// latest valid hash in an invalid chain. To support that return, we need
|
||||
// to track historical bad blocks as well as bad tipsets in case a chain
|
||||
// is constantly built on it.
|
||||
//
|
||||
// There are a few important caveats in this mechanism:
|
||||
// - The bad block tracking is ephemeral, in-memory only. We must never
|
||||
// persist any bad block information to disk as a bug in Geth could end
|
||||
// up blocking a valid chain, even if a later Geth update would accept
|
||||
// it.
|
||||
// - Bad blocks will get forgotten after a certain threshold of import
|
||||
// attempts and will be retried. The rationale is that if the network
|
||||
// really-really-really tries to feed us a block, we should give it a
|
||||
// new chance, perhaps us being racey instead of the block being legit
|
||||
// bad (this happened in Geth at a point with import vs. pending race).
|
||||
// - Tracking all the blocks built on top of the bad one could be a bit
|
||||
// problematic, so we will only track the head chain segment of a bad
|
||||
// chain to allow discarding progressing bad chains and side chains,
|
||||
// without tracking too much bad data.
|
||||
invalidBlocksHits map[common.Hash]int // Emhemeral cache to track invalid blocks and their hit count
|
||||
invalidTipsets map[common.Hash]*types.Header // Ephemeral cache to track invalid tipsets and their bad ancestor
|
||||
invalidLock sync.Mutex // Protects the invalid maps from concurrent access
|
||||
|
||||
forkChoiceLock sync.Mutex // Lock for the forkChoiceUpdated method
|
||||
}
|
||||
|
||||
// NewConsensusAPI creates a new consensus api for the given backend.
|
||||
@ -64,11 +99,16 @@ func NewConsensusAPI(eth *eth.Ethereum) *ConsensusAPI {
|
||||
if eth.BlockChain().Config().TerminalTotalDifficulty == nil {
|
||||
log.Warn("Engine API started but chain not configured for merge yet")
|
||||
}
|
||||
return &ConsensusAPI{
|
||||
api := &ConsensusAPI{
|
||||
eth: eth,
|
||||
remoteBlocks: newHeaderQueue(),
|
||||
localBlocks: newPayloadQueue(),
|
||||
invalidBlocksHits: make(map[common.Hash]int),
|
||||
invalidTipsets: make(map[common.Hash]*types.Header),
|
||||
}
|
||||
eth.Downloader().SetBadBlockCallback(api.setInvalidAncestor)
|
||||
|
||||
return api
|
||||
}
|
||||
|
||||
// ForkchoiceUpdatedV1 has several responsibilities:
|
||||
@ -96,6 +136,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
|
||||
// reason.
|
||||
block := api.eth.BlockChain().GetBlockByHash(update.HeadBlockHash)
|
||||
if block == nil {
|
||||
// If this block was previously invalidated, keep rejecting it here too
|
||||
if res := api.checkInvalidAncestor(update.HeadBlockHash, update.HeadBlockHash); res != nil {
|
||||
return beacon.ForkChoiceResponse{PayloadStatus: *res, PayloadID: nil}, nil
|
||||
}
|
||||
// If the head hash is unknown (was not given to us in a newPayload request),
|
||||
// we cannot resolve the header, so not much to do. This could be extended in
|
||||
// the future to resolve from the `eth` network, but it's an unexpected case
|
||||
@ -266,6 +310,10 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
|
||||
hash := block.Hash()
|
||||
return beacon.PayloadStatusV1{Status: beacon.VALID, LatestValidHash: &hash}, nil
|
||||
}
|
||||
// If this block was rejected previously, keep rejecting it
|
||||
if res := api.checkInvalidAncestor(block.Hash(), block.Hash()); res != nil {
|
||||
return *res, nil
|
||||
}
|
||||
// If the parent is missing, we - in theory - could trigger a sync, but that
|
||||
// would also entail a reorg. That is problematic if multiple sibling blocks
|
||||
// are being fed to us, and even more so, if some semi-distant uncle shortens
|
||||
@ -293,7 +341,7 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
|
||||
}
|
||||
if block.Time() <= parent.Time() {
|
||||
log.Warn("Invalid timestamp", "parent", block.Time(), "block", block.Time())
|
||||
return api.invalid(errors.New("invalid timestamp"), parent), nil
|
||||
return api.invalid(errors.New("invalid timestamp"), parent.Header()), nil
|
||||
}
|
||||
// Another cornercase: if the node is in snap sync mode, but the CL client
|
||||
// tries to make it import a block. That should be denied as pushing something
|
||||
@ -310,7 +358,13 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
|
||||
log.Trace("Inserting block without sethead", "hash", block.Hash(), "number", block.Number)
|
||||
if err := api.eth.BlockChain().InsertBlockWithoutSetHead(block); err != nil {
|
||||
log.Warn("NewPayloadV1: inserting block failed", "error", err)
|
||||
return api.invalid(err, parent), nil
|
||||
|
||||
api.invalidLock.Lock()
|
||||
api.invalidBlocksHits[block.Hash()] = 1
|
||||
api.invalidTipsets[block.Hash()] = block.Header()
|
||||
api.invalidLock.Unlock()
|
||||
|
||||
return api.invalid(err, parent.Header()), nil
|
||||
}
|
||||
// We've accepted a valid payload from the beacon client. Mark the local
|
||||
// chain transitions to notify other subsystems (e.g. downloader) of the
|
||||
@ -339,8 +393,13 @@ func computePayloadId(headBlockHash common.Hash, params *beacon.PayloadAttribute
|
||||
// delayPayloadImport stashes the given block away for import at a later time,
|
||||
// either via a forkchoice update or a sync extension. This method is meant to
|
||||
// be called by the newpayload command when the block seems to be ok, but some
|
||||
// prerequisite prevents it from being processed (e.g. no parent, or nap sync).
|
||||
// prerequisite prevents it from being processed (e.g. no parent, or snap sync).
|
||||
func (api *ConsensusAPI) delayPayloadImport(block *types.Block) (beacon.PayloadStatusV1, error) {
|
||||
// Sanity check that this block's parent is not on a previously invalidated
|
||||
// chain. If it is, mark the block as invalid too.
|
||||
if res := api.checkInvalidAncestor(block.ParentHash(), block.Hash()); res != nil {
|
||||
return *res, nil
|
||||
}
|
||||
// Stash the block away for a potential forced forkchoice update to it
|
||||
// at a later time.
|
||||
api.remoteBlocks.put(block.Hash(), block.Header())
|
||||
@ -360,14 +419,70 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) (beacon.PayloadS
|
||||
return beacon.PayloadStatusV1{Status: beacon.ACCEPTED}, nil
|
||||
}
|
||||
|
||||
// setInvalidAncestor is a callback for the downloader to notify us if a bad block
|
||||
// is encountered during the async sync.
|
||||
func (api *ConsensusAPI) setInvalidAncestor(invalid *types.Header, origin *types.Header) {
|
||||
api.invalidLock.Lock()
|
||||
defer api.invalidLock.Unlock()
|
||||
|
||||
api.invalidTipsets[origin.Hash()] = invalid
|
||||
api.invalidBlocksHits[invalid.Hash()]++
|
||||
}
|
||||
|
||||
// checkInvalidAncestor checks whether the specified chain end links to a known
|
||||
// bad ancestor. If yes, it constructs the payload failure response to return.
|
||||
func (api *ConsensusAPI) checkInvalidAncestor(check common.Hash, head common.Hash) *beacon.PayloadStatusV1 {
|
||||
api.invalidLock.Lock()
|
||||
defer api.invalidLock.Unlock()
|
||||
|
||||
// If the hash to check is unknown, return valid
|
||||
invalid, ok := api.invalidTipsets[check]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
// If the bad hash was hit too many times, evict it and try to reprocess in
|
||||
// the hopes that we have a data race that we can exit out of.
|
||||
badHash := invalid.Hash()
|
||||
|
||||
api.invalidBlocksHits[badHash]++
|
||||
if api.invalidBlocksHits[badHash] >= invalidBlockHitEviction {
|
||||
log.Warn("Too many bad block import attempt, trying", "number", invalid.Number, "hash", badHash)
|
||||
delete(api.invalidBlocksHits, badHash)
|
||||
|
||||
for descendant, badHeader := range api.invalidTipsets {
|
||||
if badHeader.Hash() == badHash {
|
||||
delete(api.invalidTipsets, descendant)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Not too many failures yet, mark the head of the invalid chain as invalid
|
||||
if check != head {
|
||||
log.Warn("Marked new chain head as invalid", "hash", head, "badnumber", invalid.Number, "badhash", badHash)
|
||||
for len(api.invalidTipsets) >= invalidTipsetsCap {
|
||||
for key := range api.invalidTipsets {
|
||||
delete(api.invalidTipsets, key)
|
||||
break
|
||||
}
|
||||
}
|
||||
api.invalidTipsets[head] = invalid
|
||||
}
|
||||
failure := "links to previously rejected block"
|
||||
return &beacon.PayloadStatusV1{
|
||||
Status: beacon.INVALID,
|
||||
LatestValidHash: &invalid.ParentHash,
|
||||
ValidationError: &failure,
|
||||
}
|
||||
}
|
||||
|
||||
// invalid returns a response "INVALID" with the latest valid hash supplied by latest or to the current head
|
||||
// if no latestValid block was provided.
|
||||
func (api *ConsensusAPI) invalid(err error, latestValid *types.Block) beacon.PayloadStatusV1 {
|
||||
func (api *ConsensusAPI) invalid(err error, latestValid *types.Header) beacon.PayloadStatusV1 {
|
||||
currentHash := api.eth.BlockChain().CurrentBlock().Hash()
|
||||
if latestValid != nil {
|
||||
// Set latest valid hash to 0x0 if parent is PoW block
|
||||
currentHash = common.Hash{}
|
||||
if latestValid.Difficulty().BitLen() == 0 {
|
||||
if latestValid.Difficulty.BitLen() == 0 {
|
||||
// Otherwise set latest valid hash to parent hash
|
||||
currentHash = latestValid.Hash()
|
||||
}
|
||||
|
@ -773,8 +773,8 @@ func TestTrickRemoteBlockCache(t *testing.T) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if status.Status == beacon.INVALID {
|
||||
panic("success")
|
||||
if status.Status == beacon.VALID {
|
||||
t.Error("invalid status: VALID on an invalid chain")
|
||||
}
|
||||
// Now reorg to the head of the invalid chain
|
||||
resp, err := apiB.ForkchoiceUpdatedV1(beacon.ForkchoiceStateV1{HeadBlockHash: payload.BlockHash, SafeBlockHash: payload.BlockHash, FinalizedBlockHash: payload.ParentHash}, nil)
|
||||
@ -782,7 +782,7 @@ func TestTrickRemoteBlockCache(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.PayloadStatus.Status == beacon.VALID {
|
||||
t.Errorf("invalid status: expected INVALID got: %v", resp.PayloadStatus.Status)
|
||||
t.Error("invalid status: VALID on an invalid chain")
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
@ -137,6 +137,13 @@ func (b *beaconBackfiller) setMode(mode SyncMode) {
|
||||
b.resume()
|
||||
}
|
||||
|
||||
// SetBadBlockCallback sets the callback to run when a bad block is hit by the
|
||||
// block processor. This method is not thread safe and should be set only once
|
||||
// on startup before system events are fired.
|
||||
func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) {
|
||||
d.badBlock = onBadBlock
|
||||
}
|
||||
|
||||
// BeaconSync is the post-merge version of the chain synchronization, where the
|
||||
// chain is not downloaded from genesis onward, rather from trusted head announces
|
||||
// backwards.
|
||||
|
@ -85,6 +85,10 @@ var (
|
||||
// peerDropFn is a callback type for dropping a peer detected as malicious.
|
||||
type peerDropFn func(id string)
|
||||
|
||||
// badBlockFn is a callback for the async beacon sync to notify the caller that
|
||||
// the origin header requested to sync to, produced a chain with a bad block.
|
||||
type badBlockFn func(invalid *types.Header, origin *types.Header)
|
||||
|
||||
// headerTask is a set of downloaded headers to queue along with their precomputed
|
||||
// hashes to avoid constant rehashing.
|
||||
type headerTask struct {
|
||||
@ -113,6 +117,7 @@ type Downloader struct {
|
||||
|
||||
// Callbacks
|
||||
dropPeer peerDropFn // Drops a peer for misbehaving
|
||||
badBlock badBlockFn // Reports a block as rejected by the chain
|
||||
|
||||
// Status
|
||||
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
|
||||
@ -1528,7 +1533,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
|
||||
return errCancelContentProcessing
|
||||
default:
|
||||
}
|
||||
// Retrieve the a batch of results to import
|
||||
// Retrieve a batch of results to import
|
||||
first, last := results[0].Header, results[len(results)-1].Header
|
||||
log.Debug("Inserting downloaded chain", "items", len(results),
|
||||
"firstnum", first.Number, "firsthash", first.Hash(),
|
||||
@ -1544,6 +1549,16 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
|
||||
if index, err := d.blockchain.InsertChain(blocks); err != nil {
|
||||
if index < len(results) {
|
||||
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
|
||||
|
||||
// In post-merge, notify the engine API of encountered bad chains
|
||||
if d.badBlock != nil {
|
||||
head, _, err := d.skeleton.Bounds()
|
||||
if err != nil {
|
||||
log.Error("Failed to retrieve beacon bounds for bad block reporting", "err", err)
|
||||
} else {
|
||||
d.badBlock(blocks[index].Header(), head)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// The InsertChain method in blockchain.go will sometimes return an out-of-bounds index,
|
||||
// when it needs to preprocess blocks to import a sidechain.
|
||||
|
Loading…
Reference in New Issue
Block a user