Merge pull request #3138 from karalabe/txpool-pending-limits
core: add global (soft) limits on the pending transactions
This commit is contained in:
commit
a4d9e63d12
@ -30,6 +30,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/logger"
|
"github.com/ethereum/go-ethereum/logger"
|
||||||
"github.com/ethereum/go-ethereum/logger/glog"
|
"github.com/ethereum/go-ethereum/logger/glog"
|
||||||
|
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -46,10 +47,12 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
|
minPendingPerAccount = uint64(16) // Min number of guaranteed transaction slots per address
|
||||||
maxQueuedInTotal = uint64(8192) // Max limit of queued transactions from all accounts
|
maxPendingTotal = uint64(4096) // Max limit of pending transactions from all accounts (soft)
|
||||||
maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
|
maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
|
||||||
evictionInterval = time.Minute // Time interval to check for evictable transactions
|
maxQueuedInTotal = uint64(1024) // Max limit of queued transactions from all accounts
|
||||||
|
maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
|
||||||
|
evictionInterval = time.Minute // Time interval to check for evictable transactions
|
||||||
)
|
)
|
||||||
|
|
||||||
type stateFn func() (*state.StateDB, error)
|
type stateFn func() (*state.StateDB, error)
|
||||||
@ -481,7 +484,6 @@ func (pool *TxPool) promoteExecutables() {
|
|||||||
}
|
}
|
||||||
// Iterate over all accounts and promote any executable transactions
|
// Iterate over all accounts and promote any executable transactions
|
||||||
queued := uint64(0)
|
queued := uint64(0)
|
||||||
|
|
||||||
for addr, list := range pool.queue {
|
for addr, list := range pool.queue {
|
||||||
// Drop all transactions that are deemed too old (low nonce)
|
// Drop all transactions that are deemed too old (low nonce)
|
||||||
for _, tx := range list.Forward(state.GetNonce(addr)) {
|
for _, tx := range list.Forward(state.GetNonce(addr)) {
|
||||||
@ -519,6 +521,59 @@ func (pool *TxPool) promoteExecutables() {
|
|||||||
delete(pool.queue, addr)
|
delete(pool.queue, addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// If the pending limit is overflown, start equalizing allowances
|
||||||
|
pending := uint64(0)
|
||||||
|
for _, list := range pool.pending {
|
||||||
|
pending += uint64(list.Len())
|
||||||
|
}
|
||||||
|
if pending > maxPendingTotal {
|
||||||
|
// Assemble a spam order to penalize large transactors first
|
||||||
|
spammers := prque.New()
|
||||||
|
for addr, list := range pool.pending {
|
||||||
|
// Only evict transactions from high rollers
|
||||||
|
if uint64(list.Len()) > minPendingPerAccount {
|
||||||
|
// Skip local accounts as pools should maintain backlogs for themselves
|
||||||
|
for _, tx := range list.txs.items {
|
||||||
|
if !pool.localTx.contains(tx.Hash()) {
|
||||||
|
spammers.Push(addr, float32(list.Len()))
|
||||||
|
}
|
||||||
|
break // Checking on transaction for locality is enough
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Gradually drop transactions from offenders
|
||||||
|
offenders := []common.Address{}
|
||||||
|
for pending > maxPendingTotal && !spammers.Empty() {
|
||||||
|
// Retrieve the next offender if not local address
|
||||||
|
offender, _ := spammers.Pop()
|
||||||
|
offenders = append(offenders, offender.(common.Address))
|
||||||
|
|
||||||
|
// Equalize balances until all the same or below threshold
|
||||||
|
if len(offenders) > 1 {
|
||||||
|
// Calculate the equalization threshold for all current offenders
|
||||||
|
threshold := pool.pending[offender.(common.Address)].Len()
|
||||||
|
|
||||||
|
// Iteratively reduce all offenders until below limit or threshold reached
|
||||||
|
for pending > maxPendingTotal && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
|
||||||
|
for i := 0; i < len(offenders)-1; i++ {
|
||||||
|
list := pool.pending[offenders[i]]
|
||||||
|
list.Cap(list.Len() - 1)
|
||||||
|
pending--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// If still above threshold, reduce to limit or min allowance
|
||||||
|
if pending > maxPendingTotal && len(offenders) > 0 {
|
||||||
|
for pending > maxPendingTotal && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > minPendingPerAccount {
|
||||||
|
for _, addr := range offenders {
|
||||||
|
list := pool.pending[addr]
|
||||||
|
list.Cap(list.Len() - 1)
|
||||||
|
pending--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// If we've queued more transactions than the hard limit, drop oldest ones
|
// If we've queued more transactions than the hard limit, drop oldest ones
|
||||||
if queued > maxQueuedInTotal {
|
if queued > maxQueuedInTotal {
|
||||||
// Sort all accounts with queued transactions by heartbeat
|
// Sort all accounts with queued transactions by heartbeat
|
||||||
|
@ -618,6 +618,96 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that if the transaction count belonging to multiple accounts go above
|
||||||
|
// some hard threshold, the higher transactions are dropped to prevent DOS
|
||||||
|
// attacks.
|
||||||
|
func TestTransactionPendingGlobalLimiting(t *testing.T) {
|
||||||
|
// Reduce the queue limits to shorten test time
|
||||||
|
defer func(old uint64) { maxPendingTotal = old }(maxPendingTotal)
|
||||||
|
maxPendingTotal = minPendingPerAccount * 10
|
||||||
|
|
||||||
|
// Create the pool to test the limit enforcement with
|
||||||
|
db, _ := ethdb.NewMemDatabase()
|
||||||
|
statedb, _ := state.New(common.Hash{}, db)
|
||||||
|
|
||||||
|
pool := NewTxPool(testChainConfig(), new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
|
||||||
|
pool.resetState()
|
||||||
|
|
||||||
|
// Create a number of test accounts and fund them
|
||||||
|
state, _ := pool.currentState()
|
||||||
|
|
||||||
|
keys := make([]*ecdsa.PrivateKey, 5)
|
||||||
|
for i := 0; i < len(keys); i++ {
|
||||||
|
keys[i], _ = crypto.GenerateKey()
|
||||||
|
state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
|
||||||
|
}
|
||||||
|
// Generate and queue a batch of transactions
|
||||||
|
nonces := make(map[common.Address]uint64)
|
||||||
|
|
||||||
|
txs := types.Transactions{}
|
||||||
|
for _, key := range keys {
|
||||||
|
addr := crypto.PubkeyToAddress(key.PublicKey)
|
||||||
|
for j := 0; j < int(maxPendingTotal)/len(keys)*2; j++ {
|
||||||
|
txs = append(txs, transaction(nonces[addr], big.NewInt(100000), key))
|
||||||
|
nonces[addr]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Import the batch and verify that limits have been enforced
|
||||||
|
pool.AddBatch(txs)
|
||||||
|
|
||||||
|
pending := 0
|
||||||
|
for _, list := range pool.pending {
|
||||||
|
pending += list.Len()
|
||||||
|
}
|
||||||
|
if pending > int(maxPendingTotal) {
|
||||||
|
t.Fatalf("total pending transactions overflow allowance: %d > %d", pending, maxPendingTotal)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tests that if the transaction count belonging to multiple accounts go above
|
||||||
|
// some hard threshold, if they are under the minimum guaranteed slot count then
|
||||||
|
// the transactions are still kept.
|
||||||
|
func TestTransactionPendingMinimumAllowance(t *testing.T) {
|
||||||
|
// Reduce the queue limits to shorten test time
|
||||||
|
defer func(old uint64) { maxPendingTotal = old }(maxPendingTotal)
|
||||||
|
maxPendingTotal = 0
|
||||||
|
|
||||||
|
// Create the pool to test the limit enforcement with
|
||||||
|
db, _ := ethdb.NewMemDatabase()
|
||||||
|
statedb, _ := state.New(common.Hash{}, db)
|
||||||
|
|
||||||
|
pool := NewTxPool(testChainConfig(), new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
|
||||||
|
pool.resetState()
|
||||||
|
|
||||||
|
// Create a number of test accounts and fund them
|
||||||
|
state, _ := pool.currentState()
|
||||||
|
|
||||||
|
keys := make([]*ecdsa.PrivateKey, 5)
|
||||||
|
for i := 0; i < len(keys); i++ {
|
||||||
|
keys[i], _ = crypto.GenerateKey()
|
||||||
|
state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
|
||||||
|
}
|
||||||
|
// Generate and queue a batch of transactions
|
||||||
|
nonces := make(map[common.Address]uint64)
|
||||||
|
|
||||||
|
txs := types.Transactions{}
|
||||||
|
for _, key := range keys {
|
||||||
|
addr := crypto.PubkeyToAddress(key.PublicKey)
|
||||||
|
for j := 0; j < int(minPendingPerAccount)*2; j++ {
|
||||||
|
txs = append(txs, transaction(nonces[addr], big.NewInt(100000), key))
|
||||||
|
nonces[addr]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Import the batch and verify that limits have been enforced
|
||||||
|
pool.AddBatch(txs)
|
||||||
|
|
||||||
|
for addr, list := range pool.pending {
|
||||||
|
if list.Len() != int(minPendingPerAccount) {
|
||||||
|
t.Errorf("addr %x: total pending transactions mismatch: have %d, want %d", addr, list.Len(), minPendingPerAccount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Benchmarks the speed of validating the contents of the pending queue of the
|
// Benchmarks the speed of validating the contents of the pending queue of the
|
||||||
// transaction pool.
|
// transaction pool.
|
||||||
func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) }
|
func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) }
|
||||||
|
Loading…
Reference in New Issue
Block a user