core/txpool, eth/catalyst: fix racy simulator due to txpool background reset (#28837)

This PR fixes an issues in the new simulated backend. The root cause is the fact that the transaction pool has an internal reset operation that runs on a background thread.

When a new transaction is added to the pool via the RPC, the transaction is added to a non-executable queue and will be moved to its final location on a background thread. If the machine is overloaded (or simply due to timing issues), it can happen that the simulated backend will try to produce the next block, whilst the pool has not yet marked the newly added transaction executable. This will cause the block to not contain the transaction. This is an issue because we want determinism from the simulator: add a tx, mine a block. It should be in there.

The PR fixes it by adding a Sync function to the txpool, which waits for the current reset operation (if any) to finish, and then runs an entire round of reset on top. The new round is needed because resets are only triggered by new head events, so newly added transactions will not trigger the outer resets that we can wait on. The transaction pool would eventually internally do a reset even on transaction addition, but there's no easy way to wait on that and there's no meaningful reason to bubble that across everything. A clean outer reset will at worse be a small noop goroutine.
This commit is contained in:
Péter Szilágyi 2024-01-23 20:59:38 +01:00 committed by GitHub
parent 98eaa57e6f
commit 542c861b4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 85 additions and 10 deletions

View File

@ -72,6 +72,9 @@ type TxPool struct {
subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown
quit chan chan error // Quit channel to tear down the head updater quit chan chan error // Quit channel to tear down the head updater
term chan struct{} // Termination channel to detect a closed pool
sync chan chan error // Testing / simulator channel to block until internal reset is done
} }
// New creates a new transaction pool to gather, sort and filter inbound // New creates a new transaction pool to gather, sort and filter inbound
@ -86,6 +89,8 @@ func New(gasTip *big.Int, chain BlockChain, subpools []SubPool) (*TxPool, error)
subpools: subpools, subpools: subpools,
reservations: make(map[common.Address]SubPool), reservations: make(map[common.Address]SubPool),
quit: make(chan chan error), quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
} }
for i, subpool := range subpools { for i, subpool := range subpools {
if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil { if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil {
@ -174,6 +179,9 @@ func (p *TxPool) Close() error {
// outside blockchain events as well as for various reporting and transaction // outside blockchain events as well as for various reporting and transaction
// eviction events. // eviction events.
func (p *TxPool) loop(head *types.Header, chain BlockChain) { func (p *TxPool) loop(head *types.Header, chain BlockChain) {
// Close the termination marker when the pool stops
defer close(p.term)
// Subscribe to chain head events to trigger subpool resets // Subscribe to chain head events to trigger subpool resets
var ( var (
newHeadCh = make(chan core.ChainHeadEvent) newHeadCh = make(chan core.ChainHeadEvent)
@ -190,13 +198,23 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
var ( var (
resetBusy = make(chan struct{}, 1) // Allow 1 reset to run concurrently resetBusy = make(chan struct{}, 1) // Allow 1 reset to run concurrently
resetDone = make(chan *types.Header) resetDone = make(chan *types.Header)
resetForced bool // Whether a forced reset was requested, only used in simulator mode
resetWaiter chan error // Channel waiting on a forced reset, only used in simulator mode
) )
// Notify the live reset waiter to not block if the txpool is closed.
defer func() {
if resetWaiter != nil {
resetWaiter <- errors.New("pool already terminated")
resetWaiter = nil
}
}()
var errc chan error var errc chan error
for errc == nil { for errc == nil {
// Something interesting might have happened, run a reset if there is // Something interesting might have happened, run a reset if there is
// one needed but none is running. The resetter will run on its own // one needed but none is running. The resetter will run on its own
// goroutine to allow chain head events to be consumed contiguously. // goroutine to allow chain head events to be consumed contiguously.
if newHead != oldHead { if newHead != oldHead || resetForced {
// Try to inject a busy marker and start a reset if successful // Try to inject a busy marker and start a reset if successful
select { select {
case resetBusy <- struct{}{}: case resetBusy <- struct{}{}:
@ -208,8 +226,17 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
resetDone <- newHead resetDone <- newHead
}(oldHead, newHead) }(oldHead, newHead)
// If the reset operation was explicitly requested, consider it
// being fulfilled and drop the request marker. If it was not,
// this is a noop.
resetForced = false
default: default:
// Reset already running, wait until it finishes // Reset already running, wait until it finishes.
//
// Note, this will not drop any forced reset request. If a forced
// reset was requested, but we were busy, then when the currently
// running reset finishes, a new one will be spun up.
} }
} }
// Wait for the next chain head event or a previous reset finish // Wait for the next chain head event or a previous reset finish
@ -223,8 +250,26 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
oldHead = head oldHead = head
<-resetBusy <-resetBusy
// If someone is waiting for a reset to finish, notify them, unless
// the forced op is still pending. In that case, wait another round
// of resets.
if resetWaiter != nil && !resetForced {
resetWaiter <- nil
resetWaiter = nil
}
case errc = <-p.quit: case errc = <-p.quit:
// Termination requested, break out on the next loop round // Termination requested, break out on the next loop round
case syncc := <-p.sync:
// Transaction pool is running inside a simulator, and we are about
// to create a new block. Request a forced sync operation to ensure
// that any running reset operation finishes to make block imports
// deterministic. On top of that, run a new reset operation to make
// transaction insertions deterministic instead of being stuck in a
// queue waiting for a reset.
resetForced = true
resetWaiter = syncc
} }
} }
// Notify the closer of termination (no error possible for now) // Notify the closer of termination (no error possible for now)
@ -415,3 +460,20 @@ func (p *TxPool) Status(hash common.Hash) TxStatus {
} }
return TxStatusUnknown return TxStatusUnknown
} }
// Sync is a helper method for unit tests or simulator runs where the chain events
// are arriving in quick succession, without any time in between them to run the
// internal background reset operations. This method will run an explicit reset
// operation to ensure the pool stabilises, thus avoiding flakey behavior.
//
// Note, do not use this in production / live code. In live code, the pool is
// meant to reset on a separate thread to avoid DoS vectors.
func (p *TxPool) Sync() error {
sync := make(chan error)
select {
case p.sync <- sync:
return <-sync
case <-p.term:
return errors.New("pool already terminated")
}
}

View File

@ -180,7 +180,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, engine.InvalidParams.With(errors.New("forkChoiceUpdateV1 called post-shanghai")) return engine.STATUS_INVALID, engine.InvalidParams.With(errors.New("forkChoiceUpdateV1 called post-shanghai"))
} }
} }
return api.forkchoiceUpdated(update, payloadAttributes) return api.forkchoiceUpdated(update, payloadAttributes, false)
} }
// ForkchoiceUpdatedV2 is equivalent to V1 with the addition of withdrawals in the payload attributes. // ForkchoiceUpdatedV2 is equivalent to V1 with the addition of withdrawals in the payload attributes.
@ -196,7 +196,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, engine.UnsupportedFork.With(errors.New("forkchoiceUpdatedV2 must only be called for shanghai payloads")) return engine.STATUS_INVALID, engine.UnsupportedFork.With(errors.New("forkchoiceUpdatedV2 must only be called for shanghai payloads"))
} }
} }
return api.forkchoiceUpdated(update, params) return api.forkchoiceUpdated(update, params, false)
} }
// ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root in the payload attributes. // ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root in the payload attributes.
@ -220,10 +220,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, pa
// hash, even if params are wrong. To do this we need to split up // hash, even if params are wrong. To do this we need to split up
// forkchoiceUpdate into a function that only updates the head and then a // forkchoiceUpdate into a function that only updates the head and then a
// function that kicks off block construction. // function that kicks off block construction.
return api.forkchoiceUpdated(update, params) return api.forkchoiceUpdated(update, params, false)
} }
func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, simulatorMode bool) (engine.ForkChoiceResponse, error) {
api.forkchoiceLock.Lock() api.forkchoiceLock.Lock()
defer api.forkchoiceLock.Unlock() defer api.forkchoiceLock.Unlock()
@ -330,7 +330,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
if merger := api.eth.Merger(); !merger.PoSFinalized() { if merger := api.eth.Merger(); !merger.PoSFinalized() {
merger.FinalizePoS() merger.FinalizePoS()
} }
// If the finalized block is not in our canonical tree, somethings wrong // If the finalized block is not in our canonical tree, something is wrong
finalBlock := api.eth.BlockChain().GetBlockByHash(update.FinalizedBlockHash) finalBlock := api.eth.BlockChain().GetBlockByHash(update.FinalizedBlockHash)
if finalBlock == nil { if finalBlock == nil {
log.Warn("Final block not available in database", "hash", update.FinalizedBlockHash) log.Warn("Final block not available in database", "hash", update.FinalizedBlockHash)
@ -342,7 +342,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
// Set the finalized block // Set the finalized block
api.eth.BlockChain().SetFinalized(finalBlock.Header()) api.eth.BlockChain().SetFinalized(finalBlock.Header())
} }
// Check if the safe block hash is in our canonical tree, if not somethings wrong // Check if the safe block hash is in our canonical tree, if not something is wrong
if update.SafeBlockHash != (common.Hash{}) { if update.SafeBlockHash != (common.Hash{}) {
safeBlock := api.eth.BlockChain().GetBlockByHash(update.SafeBlockHash) safeBlock := api.eth.BlockChain().GetBlockByHash(update.SafeBlockHash)
if safeBlock == nil { if safeBlock == nil {
@ -374,6 +374,19 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
if api.localBlocks.has(id) { if api.localBlocks.has(id) {
return valid(&id), nil return valid(&id), nil
} }
// If the beacon chain is ran by a simulator, then transaction insertion,
// block insertion and block production will happen without any timing
// delay between them. This will cause flaky simulator executions due to
// the transaction pool running its internal reset operation on a back-
// ground thread. To avoid the racey behavior - in simulator mode - the
// pool will be explicitly blocked on its reset before continuing to the
// block production below.
if simulatorMode {
if err := api.eth.TxPool().Sync(); err != nil {
log.Error("Failed to sync transaction pool", "err", err)
return valid(nil), engine.InvalidPayloadAttributes.With(err)
}
}
payload, err := api.eth.Miner().BuildPayload(args) payload, err := api.eth.Miner().BuildPayload(args)
if err != nil { if err != nil {
log.Error("Failed to build payload", "err", err) log.Error("Failed to build payload", "err", err)

View File

@ -155,12 +155,12 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
var random [32]byte var random [32]byte
rand.Read(random[:]) rand.Read(random[:])
fcResponse, err := c.engineAPI.ForkchoiceUpdatedV2(c.curForkchoiceState, &engine.PayloadAttributes{ fcResponse, err := c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, &engine.PayloadAttributes{
Timestamp: timestamp, Timestamp: timestamp,
SuggestedFeeRecipient: feeRecipient, SuggestedFeeRecipient: feeRecipient,
Withdrawals: withdrawals, Withdrawals: withdrawals,
Random: random, Random: random,
}) }, true)
if err != nil { if err != nil {
return err return err
} }