forked from cerc-io/plugeth
core: count tx size in slots, bump max size ot 4x32KB (#20352)
* tests for tx size * alow multiple slots transactions * tests for tx size limit (32 KB) * change tx size tests to use addRemoteSync instead of validateTx (requested in pool request). * core: minor tx slotting polishes, add slot tracking metric Co-authored-by: Michael Riabzev <RiabzevMichael@gmail.com> Co-authored-by: Péter Szilágyi <peterke@gmail.com>
This commit is contained in:
parent
b5c4ea56b8
commit
8bd37a1d91
@ -494,11 +494,11 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo
|
|||||||
|
|
||||||
// Discard finds a number of most underpriced transactions, removes them from the
|
// Discard finds a number of most underpriced transactions, removes them from the
|
||||||
// priced list and returns them for further removal from the entire pool.
|
// priced list and returns them for further removal from the entire pool.
|
||||||
func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions {
|
func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions {
|
||||||
drop := make(types.Transactions, 0, count) // Remote underpriced transactions to drop
|
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
|
||||||
save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep
|
save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep
|
||||||
|
|
||||||
for len(*l.items) > 0 && count > 0 {
|
for len(*l.items) > 0 && slots > 0 {
|
||||||
// Discard stale transactions if found during cleanup
|
// Discard stale transactions if found during cleanup
|
||||||
tx := heap.Pop(l.items).(*types.Transaction)
|
tx := heap.Pop(l.items).(*types.Transaction)
|
||||||
if l.all.Get(tx.Hash()) == nil {
|
if l.all.Get(tx.Hash()) == nil {
|
||||||
@ -510,7 +510,7 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions
|
|||||||
save = append(save, tx)
|
save = append(save, tx)
|
||||||
} else {
|
} else {
|
||||||
drop = append(drop, tx)
|
drop = append(drop, tx)
|
||||||
count--
|
slots -= numSlots(tx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, tx := range save {
|
for _, tx := range save {
|
||||||
|
@ -38,6 +38,18 @@ import (
|
|||||||
const (
|
const (
|
||||||
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
|
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
|
||||||
chainHeadChanSize = 10
|
chainHeadChanSize = 10
|
||||||
|
|
||||||
|
// txSlotSize is used to calculate how many data slots a single transaction
|
||||||
|
// takes up based on its size. The slots are used as DoS protection, ensuring
|
||||||
|
// that validating a new transaction remains a constant operation (in reality
|
||||||
|
// O(maxslots), where max slots are 4 currently).
|
||||||
|
txSlotSize = 32 * 1024
|
||||||
|
|
||||||
|
// txMaxSize is the maximum size a single transaction can have. This field has
|
||||||
|
// non-trivial consequences: larger transactions are significantly harder and
|
||||||
|
// more expensive to propagate; larger transactions also take more resources
|
||||||
|
// to validate whether they fit into the pool or not.
|
||||||
|
txMaxSize = 4 * txSlotSize // 128KB, don't bump without chunking support
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -105,6 +117,7 @@ var (
|
|||||||
pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
|
pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
|
||||||
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
|
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
|
||||||
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
|
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
|
||||||
|
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
// TxStatus is the current status of a transaction as seen by the pool.
|
// TxStatus is the current status of a transaction as seen by the pool.
|
||||||
@ -510,8 +523,8 @@ func (pool *TxPool) local() map[common.Address]types.Transactions {
|
|||||||
// validateTx checks whether a transaction is valid according to the consensus
|
// validateTx checks whether a transaction is valid according to the consensus
|
||||||
// rules and adheres to some heuristic limits of the local node (price and size).
|
// rules and adheres to some heuristic limits of the local node (price and size).
|
||||||
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
|
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
|
||||||
// Heuristic limit, reject transactions over 32KB to prevent DOS attacks
|
// Reject transactions over defined size to prevent DOS attacks
|
||||||
if tx.Size() > 32*1024 {
|
if uint64(tx.Size()) > txMaxSize {
|
||||||
return ErrOversizedData
|
return ErrOversizedData
|
||||||
}
|
}
|
||||||
// Transactions can't be negative. This may never happen using RLP decoded
|
// Transactions can't be negative. This may never happen using RLP decoded
|
||||||
@ -583,7 +596,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
|
|||||||
return false, ErrUnderpriced
|
return false, ErrUnderpriced
|
||||||
}
|
}
|
||||||
// New transaction is better than our worse ones, make room for it
|
// New transaction is better than our worse ones, make room for it
|
||||||
drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
|
drop := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), pool.locals)
|
||||||
for _, tx := range drop {
|
for _, tx := range drop {
|
||||||
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
|
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
|
||||||
underpricedTxMeter.Mark(1)
|
underpricedTxMeter.Mark(1)
|
||||||
@ -1493,8 +1506,9 @@ func (as *accountSet) merge(other *accountSet) {
|
|||||||
// peeking into the pool in TxPool.Get without having to acquire the widely scoped
|
// peeking into the pool in TxPool.Get without having to acquire the widely scoped
|
||||||
// TxPool.mu mutex.
|
// TxPool.mu mutex.
|
||||||
type txLookup struct {
|
type txLookup struct {
|
||||||
all map[common.Hash]*types.Transaction
|
all map[common.Hash]*types.Transaction
|
||||||
lock sync.RWMutex
|
slots int
|
||||||
|
lock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// newTxLookup returns a new txLookup structure.
|
// newTxLookup returns a new txLookup structure.
|
||||||
@ -1532,11 +1546,22 @@ func (t *txLookup) Count() int {
|
|||||||
return len(t.all)
|
return len(t.all)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Slots returns the current number of slots used in the lookup.
|
||||||
|
func (t *txLookup) Slots() int {
|
||||||
|
t.lock.RLock()
|
||||||
|
defer t.lock.RUnlock()
|
||||||
|
|
||||||
|
return t.slots
|
||||||
|
}
|
||||||
|
|
||||||
// Add adds a transaction to the lookup.
|
// Add adds a transaction to the lookup.
|
||||||
func (t *txLookup) Add(tx *types.Transaction) {
|
func (t *txLookup) Add(tx *types.Transaction) {
|
||||||
t.lock.Lock()
|
t.lock.Lock()
|
||||||
defer t.lock.Unlock()
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
|
t.slots += numSlots(tx)
|
||||||
|
slotsGauge.Update(int64(t.slots))
|
||||||
|
|
||||||
t.all[tx.Hash()] = tx
|
t.all[tx.Hash()] = tx
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1545,5 +1570,13 @@ func (t *txLookup) Remove(hash common.Hash) {
|
|||||||
t.lock.Lock()
|
t.lock.Lock()
|
||||||
defer t.lock.Unlock()
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
|
t.slots -= numSlots(t.all[hash])
|
||||||
|
slotsGauge.Update(int64(t.slots))
|
||||||
|
|
||||||
delete(t.all, hash)
|
delete(t.all, hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// numSlots calculates the number of slots needed for a single transaction.
|
||||||
|
func numSlots(tx *types.Transaction) int {
|
||||||
|
return int((tx.Size() + txSlotSize - 1) / txSlotSize)
|
||||||
|
}
|
||||||
|
@ -77,9 +77,17 @@ func pricedTransaction(nonce uint64, gaslimit uint64, gasprice *big.Int, key *ec
|
|||||||
return tx
|
return tx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func pricedDataTransaction(nonce uint64, gaslimit uint64, gasprice *big.Int, key *ecdsa.PrivateKey, bytes uint64) *types.Transaction {
|
||||||
|
data := make([]byte, bytes)
|
||||||
|
rand.Read(data)
|
||||||
|
|
||||||
|
tx, _ := types.SignTx(types.NewTransaction(nonce, common.Address{}, big.NewInt(0), gaslimit, gasprice, data), types.HomesteadSigner{}, key)
|
||||||
|
return tx
|
||||||
|
}
|
||||||
|
|
||||||
func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
|
func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
|
||||||
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()))
|
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()))
|
||||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
blockchain := &testBlockChain{statedb, 10000000, new(event.Feed)}
|
||||||
|
|
||||||
key, _ := crypto.GenerateKey()
|
key, _ := crypto.GenerateKey()
|
||||||
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
|
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
|
||||||
@ -465,7 +473,7 @@ func TestTransactionDropping(t *testing.T) {
|
|||||||
pool, key := setupTxPool()
|
pool, key := setupTxPool()
|
||||||
defer pool.Stop()
|
defer pool.Stop()
|
||||||
|
|
||||||
account, _ := deriveSender(transaction(0, 0, key))
|
account := crypto.PubkeyToAddress(key.PublicKey)
|
||||||
pool.currentState.AddBalance(account, big.NewInt(1000))
|
pool.currentState.AddBalance(account, big.NewInt(1000))
|
||||||
|
|
||||||
// Add some pending and some queued transactions
|
// Add some pending and some queued transactions
|
||||||
@ -674,7 +682,7 @@ func TestTransactionGapFilling(t *testing.T) {
|
|||||||
pool, key := setupTxPool()
|
pool, key := setupTxPool()
|
||||||
defer pool.Stop()
|
defer pool.Stop()
|
||||||
|
|
||||||
account, _ := deriveSender(transaction(0, 0, key))
|
account := crypto.PubkeyToAddress(key.PublicKey)
|
||||||
pool.currentState.AddBalance(account, big.NewInt(1000000))
|
pool.currentState.AddBalance(account, big.NewInt(1000000))
|
||||||
|
|
||||||
// Keep track of transaction events to ensure all executables get announced
|
// Keep track of transaction events to ensure all executables get announced
|
||||||
@ -728,7 +736,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
|
|||||||
pool, key := setupTxPool()
|
pool, key := setupTxPool()
|
||||||
defer pool.Stop()
|
defer pool.Stop()
|
||||||
|
|
||||||
account, _ := deriveSender(transaction(0, 0, key))
|
account := crypto.PubkeyToAddress(key.PublicKey)
|
||||||
pool.currentState.AddBalance(account, big.NewInt(1000000))
|
pool.currentState.AddBalance(account, big.NewInt(1000000))
|
||||||
|
|
||||||
// Keep queuing up transactions and make sure all above a limit are dropped
|
// Keep queuing up transactions and make sure all above a limit are dropped
|
||||||
@ -923,7 +931,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
|
|||||||
pool, key := setupTxPool()
|
pool, key := setupTxPool()
|
||||||
defer pool.Stop()
|
defer pool.Stop()
|
||||||
|
|
||||||
account, _ := deriveSender(transaction(0, 0, key))
|
account := crypto.PubkeyToAddress(key.PublicKey)
|
||||||
pool.currentState.AddBalance(account, big.NewInt(1000000))
|
pool.currentState.AddBalance(account, big.NewInt(1000000))
|
||||||
|
|
||||||
// Keep track of transaction events to ensure all executables get announced
|
// Keep track of transaction events to ensure all executables get announced
|
||||||
@ -1002,6 +1010,62 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test the limit on transaction size is enforced correctly.
|
||||||
|
// This test verifies every transaction having allowed size
|
||||||
|
// is added to the pool, and longer transactions are rejected.
|
||||||
|
func TestTransactionAllowedTxSize(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// Create a test account and fund it
|
||||||
|
pool, key := setupTxPool()
|
||||||
|
defer pool.Stop()
|
||||||
|
|
||||||
|
account := crypto.PubkeyToAddress(key.PublicKey)
|
||||||
|
pool.currentState.AddBalance(account, big.NewInt(1000000000))
|
||||||
|
|
||||||
|
// Compute maximal data size for transactions (lower bound).
|
||||||
|
//
|
||||||
|
// It is assumed the fields in the transaction (except of the data) are:
|
||||||
|
// - nonce <= 32 bytes
|
||||||
|
// - gasPrice <= 32 bytes
|
||||||
|
// - gasLimit <= 32 bytes
|
||||||
|
// - recipient == 20 bytes
|
||||||
|
// - value <= 32 bytes
|
||||||
|
// - signature == 65 bytes
|
||||||
|
// All those fields are summed up to at most 213 bytes.
|
||||||
|
baseSize := uint64(213)
|
||||||
|
dataSize := txMaxSize - baseSize
|
||||||
|
|
||||||
|
// Try adding a transaction with maximal allowed size
|
||||||
|
tx := pricedDataTransaction(0, pool.currentMaxGas, big.NewInt(1), key, dataSize)
|
||||||
|
if err := pool.addRemoteSync(tx); err != nil {
|
||||||
|
t.Fatalf("failed to add transaction of size %d, close to maximal: %v", int(tx.Size()), err)
|
||||||
|
}
|
||||||
|
// Try adding a transaction with random allowed size
|
||||||
|
if err := pool.addRemoteSync(pricedDataTransaction(1, pool.currentMaxGas, big.NewInt(1), key, uint64(rand.Intn(int(dataSize))))); err != nil {
|
||||||
|
t.Fatalf("failed to add transaction of random allowed size: %v", err)
|
||||||
|
}
|
||||||
|
// Try adding a transaction of minimal not allowed size
|
||||||
|
if err := pool.addRemoteSync(pricedDataTransaction(2, pool.currentMaxGas, big.NewInt(1), key, txMaxSize)); err == nil {
|
||||||
|
t.Fatalf("expected rejection on slightly oversize transaction")
|
||||||
|
}
|
||||||
|
// Try adding a transaction of random not allowed size
|
||||||
|
if err := pool.addRemoteSync(pricedDataTransaction(2, pool.currentMaxGas, big.NewInt(1), key, dataSize+1+uint64(rand.Intn(int(10*txMaxSize))))); err == nil {
|
||||||
|
t.Fatalf("expected rejection on oversize transaction")
|
||||||
|
}
|
||||||
|
// Run some sanity checks on the pool internals
|
||||||
|
pending, queued := pool.Stats()
|
||||||
|
if pending != 2 {
|
||||||
|
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
|
||||||
|
}
|
||||||
|
if queued != 0 {
|
||||||
|
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
|
||||||
|
}
|
||||||
|
if err := validateTxPoolInternals(pool); err != nil {
|
||||||
|
t.Fatalf("pool internal state corrupted: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Tests that if transactions start being capped, transactions are also removed from 'all'
|
// Tests that if transactions start being capped, transactions are also removed from 'all'
|
||||||
func TestTransactionCapClearsFromAll(t *testing.T) {
|
func TestTransactionCapClearsFromAll(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
@ -1752,6 +1816,24 @@ func TestTransactionStatusCheck(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test the transaction slots consumption is computed correctly
|
||||||
|
func TestTransactionSlotCount(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
key, _ := crypto.GenerateKey()
|
||||||
|
|
||||||
|
// Check that an empty transaction consumes a single slot
|
||||||
|
smallTx := pricedDataTransaction(0, 0, big.NewInt(0), key, 0)
|
||||||
|
if slots := numSlots(smallTx); slots != 1 {
|
||||||
|
t.Fatalf("small transactions slot count mismatch: have %d want %d", slots, 1)
|
||||||
|
}
|
||||||
|
// Check that a large transaction consumes the correct number of slots
|
||||||
|
bigTx := pricedDataTransaction(0, 0, big.NewInt(0), key, uint64(10*txSlotSize))
|
||||||
|
if slots := numSlots(bigTx); slots != 11 {
|
||||||
|
t.Fatalf("big transactions slot count mismatch: have %d want %d", slots, 11)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Benchmarks the speed of validating the contents of the pending queue of the
|
// Benchmarks the speed of validating the contents of the pending queue of the
|
||||||
// transaction pool.
|
// transaction pool.
|
||||||
func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) }
|
func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) }
|
||||||
@ -1763,7 +1845,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
|
|||||||
pool, key := setupTxPool()
|
pool, key := setupTxPool()
|
||||||
defer pool.Stop()
|
defer pool.Stop()
|
||||||
|
|
||||||
account, _ := deriveSender(transaction(0, 0, key))
|
account := crypto.PubkeyToAddress(key.PublicKey)
|
||||||
pool.currentState.AddBalance(account, big.NewInt(1000000))
|
pool.currentState.AddBalance(account, big.NewInt(1000000))
|
||||||
|
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
@ -1788,7 +1870,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
|
|||||||
pool, key := setupTxPool()
|
pool, key := setupTxPool()
|
||||||
defer pool.Stop()
|
defer pool.Stop()
|
||||||
|
|
||||||
account, _ := deriveSender(transaction(0, 0, key))
|
account := crypto.PubkeyToAddress(key.PublicKey)
|
||||||
pool.currentState.AddBalance(account, big.NewInt(1000000))
|
pool.currentState.AddBalance(account, big.NewInt(1000000))
|
||||||
|
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
@ -1812,7 +1894,7 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) {
|
|||||||
pool, key := setupTxPool()
|
pool, key := setupTxPool()
|
||||||
defer pool.Stop()
|
defer pool.Stop()
|
||||||
|
|
||||||
account, _ := deriveSender(transaction(0, 0, key))
|
account := crypto.PubkeyToAddress(key.PublicKey)
|
||||||
pool.currentState.AddBalance(account, big.NewInt(1000000))
|
pool.currentState.AddBalance(account, big.NewInt(1000000))
|
||||||
|
|
||||||
batches := make([]types.Transactions, b.N)
|
batches := make([]types.Transactions, b.N)
|
||||||
|
Loading…
Reference in New Issue
Block a user