core: fixed race condition in the transaction pool
Removed `Stop/Start` mechanism from the transaction pool.
This commit is contained in:
		
							parent
							
								
									858a6f0be9
								
							
						
					
					
						commit
						65a48f9cd8
					
				| @ -50,7 +50,7 @@ type TxPool struct { | ||||
| } | ||||
| 
 | ||||
| func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { | ||||
| 	return &TxPool{ | ||||
| 	pool := &TxPool{ | ||||
| 		pending:      make(map[common.Hash]*types.Transaction), | ||||
| 		queue:        make(map[common.Address]map[common.Hash]*types.Transaction), | ||||
| 		quit:         make(chan bool), | ||||
| @ -59,9 +59,12 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func( | ||||
| 		gasLimit:     gasLimitFn, | ||||
| 		pendingState: state.ManageState(currentStateFn()), | ||||
| 	} | ||||
| 	go pool.eventLoop() | ||||
| 
 | ||||
| 	return pool | ||||
| } | ||||
| 
 | ||||
| func (pool *TxPool) Start() { | ||||
| func (pool *TxPool) eventLoop() { | ||||
| 	// Track chain events. When a chain events occurs (new chain canon block)
 | ||||
| 	// we need to know the new state. The new state will help us determine
 | ||||
| 	// the nonces in the managed state
 | ||||
| @ -169,15 +172,10 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // validate and queue transactions.
 | ||||
| func (self *TxPool) add(tx *types.Transaction) error { | ||||
| 	hash := tx.Hash() | ||||
| 
 | ||||
| 	/* XXX I'm unsure about this. This is extremely dangerous and may result | ||||
| 	 in total black listing of certain transactions | ||||
| 	if self.invalidHashes.Has(hash) { | ||||
| 		return fmt.Errorf("Invalid transaction (%x)", hash[:4]) | ||||
| 	} | ||||
| 	*/ | ||||
| 	if self.pending[hash] != nil { | ||||
| 		return fmt.Errorf("Known transaction (%x)", hash[:4]) | ||||
| 	} | ||||
| @ -207,6 +205,30 @@ func (self *TxPool) add(tx *types.Transaction) error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // queueTx will queue an unknown transaction
 | ||||
| func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { | ||||
| 	from, _ := tx.From() // already validated
 | ||||
| 	if self.queue[from] == nil { | ||||
| 		self.queue[from] = make(map[common.Hash]*types.Transaction) | ||||
| 	} | ||||
| 	self.queue[from][hash] = tx | ||||
| } | ||||
| 
 | ||||
| // addTx will add a transaction to the pending (processable queue) list of transactions
 | ||||
| func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { | ||||
| 	if _, ok := pool.pending[hash]; !ok { | ||||
| 		pool.pending[hash] = tx | ||||
| 
 | ||||
| 		// Increment the nonce on the pending state. This can only happen if
 | ||||
| 		// the nonce is +1 to the previous one.
 | ||||
| 		pool.pendingState.SetNonce(addr, tx.AccountNonce+1) | ||||
| 		// Notify the subscribers. This event is posted in a goroutine
 | ||||
| 		// because it's possible that somewhere during the post "Remove transaction"
 | ||||
| 		// gets called which will then wait for the global tx pool lock and deadlock.
 | ||||
| 		go pool.eventMux.Post(TxPreEvent{tx}) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Add queues a single transaction in the pool if it is valid.
 | ||||
| func (self *TxPool) Add(tx *types.Transaction) error { | ||||
| 	self.mu.Lock() | ||||
| @ -290,28 +312,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { | ||||
| 	from, _ := tx.From() // already validated
 | ||||
| 	if self.queue[from] == nil { | ||||
| 		self.queue[from] = make(map[common.Hash]*types.Transaction) | ||||
| 	} | ||||
| 	self.queue[from][hash] = tx | ||||
| } | ||||
| 
 | ||||
| func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { | ||||
| 	if _, ok := pool.pending[hash]; !ok { | ||||
| 		pool.pending[hash] = tx | ||||
| 
 | ||||
| 		// Increment the nonce on the pending state. This can only happen if
 | ||||
| 		// the nonce is +1 to the previous one.
 | ||||
| 		pool.pendingState.SetNonce(addr, tx.AccountNonce+1) | ||||
| 		// Notify the subscribers. This event is posted in a goroutine
 | ||||
| 		// because it's possible that somewhere during the post "Remove transaction"
 | ||||
| 		// gets called which will then wait for the global tx pool lock and deadlock.
 | ||||
| 		go pool.eventMux.Post(TxPreEvent{tx}) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // checkQueue moves transactions that have become processable to main pool.
 | ||||
| func (pool *TxPool) checkQueue() { | ||||
| 	state := pool.pendingState | ||||
|  | ||||
| @ -466,8 +466,6 @@ func (s *Ethereum) Start() error { | ||||
| 		s.StartAutoDAG() | ||||
| 	} | ||||
| 
 | ||||
| 	// Start services
 | ||||
| 	go s.txPool.Start() | ||||
| 	s.protocolManager.Start() | ||||
| 
 | ||||
| 	if s.whisper != nil { | ||||
| @ -513,9 +511,6 @@ func (s *Ethereum) StartForTest() { | ||||
| 		ClientString:    s.net.Name, | ||||
| 		ProtocolVersion: ProtocolVersion, | ||||
| 	}) | ||||
| 
 | ||||
| 	// Start services
 | ||||
| 	s.txPool.Start() | ||||
| } | ||||
| 
 | ||||
| // AddPeer connects to the given node and maintains the connection until the
 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user