core: fix race conditions in txpool (#23474)
* core: fix race conditions in txpool * core: fixed races in the txpool * core: rebased on master * core: move reheap mutex * core: renamed mutex * core: revert Reheap changes
This commit is contained in:
		
							parent
							
								
									d019e90162
								
							
						
					
					
						commit
						067084feda
					
				| @ -21,6 +21,8 @@ import ( | ||||
| 	"math" | ||||
| 	"math/big" | ||||
| 	"sort" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/ethereum/go-ethereum/common" | ||||
| @ -478,9 +480,10 @@ func (h *priceHeap) Pop() interface{} { | ||||
| // better candidates for inclusion while in other cases (at the top of the baseFee peak)
 | ||||
| // the floating heap is better. When baseFee is decreasing they behave similarly.
 | ||||
| type txPricedList struct { | ||||
| 	all              *txLookup // Pointer to the map of all transactions
 | ||||
| 	urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
 | ||||
| 	stales           int       // Number of stale price points to (re-heap trigger)
 | ||||
| 	all              *txLookup  // Pointer to the map of all transactions
 | ||||
| 	urgent, floating priceHeap  // Heaps of prices of all the stored **remote** transactions
 | ||||
| 	stales           int64      // Number of stale price points to (re-heap trigger)
 | ||||
| 	reheapMu         sync.Mutex // Mutex asserts that only one routine is reheaping the list
 | ||||
| } | ||||
| 
 | ||||
| const ( | ||||
| @ -510,8 +513,8 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) { | ||||
| // the heap if a large enough ratio of transactions go stale.
 | ||||
| func (l *txPricedList) Removed(count int) { | ||||
| 	// Bump the stale counter, but exit if still too low (< 25%)
 | ||||
| 	l.stales += count | ||||
| 	if l.stales <= (len(l.urgent.list)+len(l.floating.list))/4 { | ||||
| 	stales := atomic.AddInt64(&l.stales, int64(count)) | ||||
| 	if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 { | ||||
| 		return | ||||
| 	} | ||||
| 	// Seems we've reached a critical number of stale transactions, reheap
 | ||||
| @ -535,7 +538,7 @@ func (l *txPricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool | ||||
| 	for len(h.list) > 0 { | ||||
| 		head := h.list[0] | ||||
| 		if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
 | ||||
| 			l.stales-- | ||||
| 			atomic.AddInt64(&l.stales, -1) | ||||
| 			heap.Pop(h) | ||||
| 			continue | ||||
| 		} | ||||
| @ -561,7 +564,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) | ||||
| 			// Discard stale transactions if found during cleanup
 | ||||
| 			tx := heap.Pop(&l.urgent).(*types.Transaction) | ||||
| 			if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
 | ||||
| 				l.stales-- | ||||
| 				atomic.AddInt64(&l.stales, -1) | ||||
| 				continue | ||||
| 			} | ||||
| 			// Non stale transaction found, move to floating heap
 | ||||
| @ -574,7 +577,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) | ||||
| 			// Discard stale transactions if found during cleanup
 | ||||
| 			tx := heap.Pop(&l.floating).(*types.Transaction) | ||||
| 			if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
 | ||||
| 				l.stales-- | ||||
| 				atomic.AddInt64(&l.stales, -1) | ||||
| 				continue | ||||
| 			} | ||||
| 			// Non stale transaction found, discard it
 | ||||
| @ -594,8 +597,10 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) | ||||
| 
 | ||||
| // Reheap forcibly rebuilds the heap based on the current remote transaction set.
 | ||||
| func (l *txPricedList) Reheap() { | ||||
| 	l.reheapMu.Lock() | ||||
| 	defer l.reheapMu.Unlock() | ||||
| 	start := time.Now() | ||||
| 	l.stales = 0 | ||||
| 	atomic.StoreInt64(&l.stales, 0) | ||||
| 	l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount()) | ||||
| 	l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { | ||||
| 		l.urgent.list = append(l.urgent.list, tx) | ||||
|  | ||||
| @ -22,6 +22,7 @@ import ( | ||||
| 	"math/big" | ||||
| 	"sort" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/ethereum/go-ethereum/common" | ||||
| @ -264,6 +265,7 @@ type TxPool struct { | ||||
| 	reorgDoneCh     chan chan struct{} | ||||
| 	reorgShutdownCh chan struct{}  // requests shutdown of scheduleReorgLoop
 | ||||
| 	wg              sync.WaitGroup // tracks loop, scheduleReorgLoop
 | ||||
| 	initDoneCh      chan struct{}  // is closed once the pool is initialized (for tests)
 | ||||
| 
 | ||||
| 	changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
 | ||||
| } | ||||
| @ -294,6 +296,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block | ||||
| 		queueTxEventCh:  make(chan *types.Transaction), | ||||
| 		reorgDoneCh:     make(chan chan struct{}), | ||||
| 		reorgShutdownCh: make(chan struct{}), | ||||
| 		initDoneCh:      make(chan struct{}), | ||||
| 		gasPrice:        new(big.Int).SetUint64(config.PriceLimit), | ||||
| 	} | ||||
| 	pool.locals = newAccountSet(pool.signer) | ||||
| @ -347,6 +350,8 @@ func (pool *TxPool) loop() { | ||||
| 	defer evict.Stop() | ||||
| 	defer journal.Stop() | ||||
| 
 | ||||
| 	// Notify tests that the init phase is done
 | ||||
| 	close(pool.initDoneCh) | ||||
| 	for { | ||||
| 		select { | ||||
| 		// Handle ChainHeadEvent
 | ||||
| @ -365,8 +370,8 @@ func (pool *TxPool) loop() { | ||||
| 		case <-report.C: | ||||
| 			pool.mu.RLock() | ||||
| 			pending, queued := pool.stats() | ||||
| 			stales := pool.priced.stales | ||||
| 			pool.mu.RUnlock() | ||||
| 			stales := int(atomic.LoadInt64(&pool.priced.stales)) | ||||
| 
 | ||||
| 			if pending != prevPending || queued != prevQueued || stales != prevStales { | ||||
| 				log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) | ||||
|  | ||||
| @ -24,6 +24,7 @@ import ( | ||||
| 	"math/big" | ||||
| 	"math/rand" | ||||
| 	"os" | ||||
| 	"sync/atomic" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| @ -64,7 +65,7 @@ type testBlockChain struct { | ||||
| 
 | ||||
| func (bc *testBlockChain) CurrentBlock() *types.Block { | ||||
| 	return types.NewBlock(&types.Header{ | ||||
| 		GasLimit: bc.gasLimit, | ||||
| 		GasLimit: atomic.LoadUint64(&bc.gasLimit), | ||||
| 	}, nil, nil, nil, trie.NewStackTrie(nil)) | ||||
| } | ||||
| 
 | ||||
| @ -123,6 +124,8 @@ func setupTxPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateK | ||||
| 	key, _ := crypto.GenerateKey() | ||||
| 	pool := NewTxPool(testTxPoolConfig, config, blockchain) | ||||
| 
 | ||||
| 	// wait for the pool to initialize
 | ||||
| 	<-pool.initDoneCh | ||||
| 	return pool, key | ||||
| } | ||||
| 
 | ||||
| @ -625,7 +628,7 @@ func TestTransactionDropping(t *testing.T) { | ||||
| 		t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4) | ||||
| 	} | ||||
| 	// Reduce the block gas limit, check that invalidated transactions are dropped
 | ||||
| 	pool.chain.(*testBlockChain).gasLimit = 100 | ||||
| 	atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100) | ||||
| 	<-pool.requestReset(nil, nil) | ||||
| 
 | ||||
| 	if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user