Merge pull request #20081 from karalabe/txpool-lockless-dedup

core: dedup known transactions without global lock, track metrics
This commit is contained in:
Péter Szilágyi 2019-09-17 16:25:24 +03:00 committed by GitHub
commit d4dce43bff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -97,7 +97,8 @@ var (
queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
// General tx metrics // General tx metrics
validMeter = metrics.NewRegisteredMeter("txpool/valid", nil) knownTxMeter = metrics.NewRegisteredMeter("txpool/known", nil)
validTxMeter = metrics.NewRegisteredMeter("txpool/valid", nil)
invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil) invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil) underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
@ -564,16 +565,15 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
hash := tx.Hash() hash := tx.Hash()
if pool.all.Get(hash) != nil { if pool.all.Get(hash) != nil {
log.Trace("Discarding already known transaction", "hash", hash) log.Trace("Discarding already known transaction", "hash", hash)
knownTxMeter.Mark(1)
return false, fmt.Errorf("known transaction: %x", hash) return false, fmt.Errorf("known transaction: %x", hash)
} }
// If the transaction fails basic validation, discard it // If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil { if err := pool.validateTx(tx, local); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err) log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxMeter.Mark(1) invalidTxMeter.Mark(1)
return false, err return false, err
} }
// If the transaction pool is full, discard underpriced transactions // If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it // If the new transaction is underpriced, don't accept it
@ -590,7 +590,6 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
pool.removeTx(tx.Hash(), false) pool.removeTx(tx.Hash(), false)
} }
} }
// Try to replace an existing transaction in the pending pool // Try to replace an existing transaction in the pending pool
from, _ := types.Sender(pool.signer, tx) // already validated from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx) { if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
@ -613,13 +612,11 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
return old != nil, nil return old != nil, nil
} }
// New transaction isn't replacing a pending one, push into queue // New transaction isn't replacing a pending one, push into queue
replaced, err = pool.enqueueTx(hash, tx) replaced, err = pool.enqueueTx(hash, tx)
if err != nil { if err != nil {
return false, err return false, err
} }
// Mark local addresses and journal local transactions // Mark local addresses and journal local transactions
if local { if local {
if !pool.locals.contains(from) { if !pool.locals.contains(from) {
@ -768,11 +765,18 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error {
// addTxs attempts to queue a batch of transactions if they are valid. // addTxs attempts to queue a batch of transactions if they are valid.
func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
// Filter out known ones without obtaining the pool lock or recovering signatures
for i := 0; i < len(txs); i++ {
if pool.all.Get(txs[i].Hash()) != nil {
knownTxMeter.Mark(1)
txs = append(txs[:i], txs[i+1:]...)
i--
}
}
// Cache senders in transactions before obtaining lock (pool.signer is immutable) // Cache senders in transactions before obtaining lock (pool.signer is immutable)
for _, tx := range txs { for _, tx := range txs {
types.Sender(pool.signer, tx) types.Sender(pool.signer, tx)
} }
pool.mu.Lock() pool.mu.Lock()
errs, dirtyAddrs := pool.addTxsLocked(txs, local) errs, dirtyAddrs := pool.addTxsLocked(txs, local)
pool.mu.Unlock() pool.mu.Unlock()
@ -796,7 +800,7 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error,
dirty.addTx(tx) dirty.addTx(tx)
} }
} }
validMeter.Mark(int64(len(dirty.accounts))) validTxMeter.Mark(int64(len(dirty.accounts)))
return errs, dirty return errs, dirty
} }