Merge pull request #2742 from karalabe/tx-spam-protection
Transaction pool optimizations
This commit is contained in:
commit
a42b7355f4
342
core/tx_list.go
Normal file
342
core/tx_list.go
Normal file
@ -0,0 +1,342 @@
|
||||
// Copyright 2016 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package core
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"math"
|
||||
"math/big"
|
||||
"sort"
|
||||
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
)
|
||||
|
||||
// nonceHeap is a heap.Interface implementation over 64bit unsigned integers for
|
||||
// retrieving sorted transactions from the possibly gapped future queue.
|
||||
type nonceHeap []uint64
|
||||
|
||||
func (h nonceHeap) Len() int { return len(h) }
|
||||
func (h nonceHeap) Less(i, j int) bool { return h[i] < h[j] }
|
||||
func (h nonceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
||||
|
||||
func (h *nonceHeap) Push(x interface{}) {
|
||||
*h = append(*h, x.(uint64))
|
||||
}
|
||||
|
||||
func (h *nonceHeap) Pop() interface{} {
|
||||
old := *h
|
||||
n := len(old)
|
||||
x := old[n-1]
|
||||
*h = old[0 : n-1]
|
||||
return x
|
||||
}
|
||||
|
||||
// txSortedMap is a nonce->transaction hash map with a heap based index to allow
|
||||
// iterating over the contents in a nonce-incrementing way.
|
||||
type txSortedMap struct {
|
||||
items map[uint64]*types.Transaction // Hash map storing the transaction data
|
||||
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
|
||||
cache types.Transactions // Cache of the transactions already sorted
|
||||
}
|
||||
|
||||
// newTxSortedMap creates a new sorted transaction map.
|
||||
func newTxSortedMap() *txSortedMap {
|
||||
return &txSortedMap{
|
||||
items: make(map[uint64]*types.Transaction),
|
||||
index: &nonceHeap{},
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves the current transactions associated with the given nonce.
|
||||
func (m *txSortedMap) Get(nonce uint64) *types.Transaction {
|
||||
return m.items[nonce]
|
||||
}
|
||||
|
||||
// Put inserts a new transaction into the map, also updating the map's nonce
|
||||
// index. If a transaction already exists with the same nonce, it's overwritten.
|
||||
func (m *txSortedMap) Put(tx *types.Transaction) {
|
||||
nonce := tx.Nonce()
|
||||
if m.items[nonce] == nil {
|
||||
heap.Push(m.index, nonce)
|
||||
}
|
||||
m.items[nonce], m.cache = tx, nil
|
||||
}
|
||||
|
||||
// Forward removes all transactions from the map with a nonce lower than the
|
||||
// provided threshold. Every removed transaction is returned for any post-removal
|
||||
// maintenance.
|
||||
func (m *txSortedMap) Forward(threshold uint64) types.Transactions {
|
||||
var removed types.Transactions
|
||||
|
||||
// Pop off heap items until the threshold is reached
|
||||
for m.index.Len() > 0 && (*m.index)[0] < threshold {
|
||||
nonce := heap.Pop(m.index).(uint64)
|
||||
removed = append(removed, m.items[nonce])
|
||||
delete(m.items, nonce)
|
||||
}
|
||||
// If we had a cached order, shift the front
|
||||
if m.cache != nil {
|
||||
m.cache = m.cache[len(removed):]
|
||||
}
|
||||
return removed
|
||||
}
|
||||
|
||||
// Filter iterates over the list of transactions and removes all of them for which
|
||||
// the specified function evaluates to true.
|
||||
func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions {
|
||||
var removed types.Transactions
|
||||
|
||||
// Collect all the transactions to filter out
|
||||
for nonce, tx := range m.items {
|
||||
if filter(tx) {
|
||||
removed = append(removed, tx)
|
||||
delete(m.items, nonce)
|
||||
}
|
||||
}
|
||||
// If transactions were removed, the heap and cache are ruined
|
||||
if len(removed) > 0 {
|
||||
*m.index = make([]uint64, 0, len(m.items))
|
||||
for nonce, _ := range m.items {
|
||||
*m.index = append(*m.index, nonce)
|
||||
}
|
||||
heap.Init(m.index)
|
||||
|
||||
m.cache = nil
|
||||
}
|
||||
return removed
|
||||
}
|
||||
|
||||
// Cap places a hard limit on the number of items, returning all transactions
|
||||
// exceeding that limit.
|
||||
func (m *txSortedMap) Cap(threshold int) types.Transactions {
|
||||
// Short circuit if the number of items is under the limit
|
||||
if len(m.items) <= threshold {
|
||||
return nil
|
||||
}
|
||||
// Otherwise gather and drop the highest nonce'd transactions
|
||||
var drops types.Transactions
|
||||
|
||||
sort.Sort(*m.index)
|
||||
for size := len(m.items); size > threshold; size-- {
|
||||
drops = append(drops, m.items[(*m.index)[size-1]])
|
||||
delete(m.items, (*m.index)[size-1])
|
||||
}
|
||||
*m.index = (*m.index)[:threshold]
|
||||
heap.Init(m.index)
|
||||
|
||||
// If we had a cache, shift the back
|
||||
if m.cache != nil {
|
||||
m.cache = m.cache[:len(m.cache)-len(drops)]
|
||||
}
|
||||
return drops
|
||||
}
|
||||
|
||||
// Remove deletes a transaction from the maintained map, returning whether the
|
||||
// transaction was found.
|
||||
func (m *txSortedMap) Remove(nonce uint64) bool {
|
||||
// Short circuit if no transaction is present
|
||||
_, ok := m.items[nonce]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
// Otherwise delete the transaction and fix the heap index
|
||||
for i := 0; i < m.index.Len(); i++ {
|
||||
if (*m.index)[i] == nonce {
|
||||
heap.Remove(m.index, i)
|
||||
break
|
||||
}
|
||||
}
|
||||
delete(m.items, nonce)
|
||||
m.cache = nil
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Ready retrieves a sequentially increasing list of transactions starting at the
|
||||
// provided nonce that is ready for processing. The returned transactions will be
|
||||
// removed from the list.
|
||||
//
|
||||
// Note, all transactions with nonces lower than start will also be returned to
|
||||
// prevent getting into and invalid state. This is not something that should ever
|
||||
// happen but better to be self correcting than failing!
|
||||
func (m *txSortedMap) Ready(start uint64) types.Transactions {
|
||||
// Short circuit if no transactions are available
|
||||
if m.index.Len() == 0 || (*m.index)[0] > start {
|
||||
return nil
|
||||
}
|
||||
// Otherwise start accumulating incremental transactions
|
||||
var ready types.Transactions
|
||||
for next := (*m.index)[0]; m.index.Len() > 0 && (*m.index)[0] == next; next++ {
|
||||
ready = append(ready, m.items[next])
|
||||
delete(m.items, next)
|
||||
heap.Pop(m.index)
|
||||
}
|
||||
m.cache = nil
|
||||
|
||||
return ready
|
||||
}
|
||||
|
||||
// Len returns the length of the transaction map.
|
||||
func (m *txSortedMap) Len() int {
|
||||
return len(m.items)
|
||||
}
|
||||
|
||||
// Flatten creates a nonce-sorted slice of transactions based on the loosely
|
||||
// sorted internal representation. The result of the sorting is cached in case
|
||||
// it's requested again before any modifications are made to the contents.
|
||||
func (m *txSortedMap) Flatten() types.Transactions {
|
||||
// If the sorting was not cached yet, create and cache it
|
||||
if m.cache == nil {
|
||||
m.cache = make(types.Transactions, 0, len(m.items))
|
||||
for _, tx := range m.items {
|
||||
m.cache = append(m.cache, tx)
|
||||
}
|
||||
sort.Sort(types.TxByNonce(m.cache))
|
||||
}
|
||||
// Copy the cache to prevent accidental modifications
|
||||
txs := make(types.Transactions, len(m.cache))
|
||||
copy(txs, m.cache)
|
||||
return txs
|
||||
}
|
||||
|
||||
// txList is a "list" of transactions belonging to an account, sorted by account
|
||||
// nonce. The same type can be used both for storing contiguous transactions for
|
||||
// the executable/pending queue; and for storing gapped transactions for the non-
|
||||
// executable/future queue, with minor behavoiral changes.
|
||||
type txList struct {
|
||||
strict bool // Whether nonces are strictly continuous or not
|
||||
txs *txSortedMap // Heap indexed sorted hash map of the transactions
|
||||
costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance)
|
||||
}
|
||||
|
||||
// newTxList create a new transaction list for maintaining nonce-indexable fast,
|
||||
// gapped, sortable transaction lists.
|
||||
func newTxList(strict bool) *txList {
|
||||
return &txList{
|
||||
strict: strict,
|
||||
txs: newTxSortedMap(),
|
||||
costcap: new(big.Int),
|
||||
}
|
||||
}
|
||||
|
||||
// Add tries to insert a new transaction into the list, returning whether the
|
||||
// transaction was accepted, and if yes, any previous transaction it replaced.
|
||||
//
|
||||
// If the new transaction is accepted into the list, the lists' cost threshold
|
||||
// is also potentially updated.
|
||||
func (l *txList) Add(tx *types.Transaction) (bool, *types.Transaction) {
|
||||
// If there's an older better transaction, abort
|
||||
old := l.txs.Get(tx.Nonce())
|
||||
if old != nil && old.GasPrice().Cmp(tx.GasPrice()) >= 0 {
|
||||
return false, nil
|
||||
}
|
||||
// Otherwise overwrite the old transaction with the current one
|
||||
l.txs.Put(tx)
|
||||
if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 {
|
||||
l.costcap = cost
|
||||
}
|
||||
return true, old
|
||||
}
|
||||
|
||||
// Forward removes all transactions from the list with a nonce lower than the
|
||||
// provided threshold. Every removed transaction is returned for any post-removal
|
||||
// maintenance.
|
||||
func (l *txList) Forward(threshold uint64) types.Transactions {
|
||||
return l.txs.Forward(threshold)
|
||||
}
|
||||
|
||||
// Filter removes all transactions from the list with a cost higher than the
|
||||
// provided threshold. Every removed transaction is returned for any post-removal
|
||||
// maintenance. Strict-mode invalidated transactions are also returned.
|
||||
//
|
||||
// This method uses the cached costcap to quickly decide if there's even a point
|
||||
// in calculating all the costs or if the balance covers all. If the threshold is
|
||||
// lower than the costcap, the costcap will be reset to a new high after removing
|
||||
// expensive the too transactions.
|
||||
func (l *txList) Filter(threshold *big.Int) (types.Transactions, types.Transactions) {
|
||||
// If all transactions are below the threshold, short circuit
|
||||
if l.costcap.Cmp(threshold) <= 0 {
|
||||
return nil, nil
|
||||
}
|
||||
l.costcap = new(big.Int).Set(threshold) // Lower the cap to the threshold
|
||||
|
||||
// Filter out all the transactions above the account's funds
|
||||
removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(threshold) > 0 })
|
||||
|
||||
// If the list was strict, filter anything above the lowest nonce
|
||||
var invalids types.Transactions
|
||||
if l.strict && len(removed) > 0 {
|
||||
lowest := uint64(math.MaxUint64)
|
||||
for _, tx := range removed {
|
||||
if nonce := tx.Nonce(); lowest > nonce {
|
||||
lowest = nonce
|
||||
}
|
||||
}
|
||||
invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
|
||||
}
|
||||
return removed, invalids
|
||||
}
|
||||
|
||||
// Cap places a hard limit on the number of items, returning all transactions
|
||||
// exceeding that limit.
|
||||
func (l *txList) Cap(threshold int) types.Transactions {
|
||||
return l.txs.Cap(threshold)
|
||||
}
|
||||
|
||||
// Remove deletes a transaction from the maintained list, returning whether the
|
||||
// transaction was found, and also returning any transaction invalidated due to
|
||||
// the deletion (strict mode only).
|
||||
func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) {
|
||||
// Remove the transaction from the set
|
||||
nonce := tx.Nonce()
|
||||
if removed := l.txs.Remove(nonce); !removed {
|
||||
return false, nil
|
||||
}
|
||||
// In strict mode, filter out non-executable transactions
|
||||
if l.strict {
|
||||
return true, l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > nonce })
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Ready retrieves a sequentially increasing list of transactions starting at the
|
||||
// provided nonce that is ready for processing. The returned transactions will be
|
||||
// removed from the list.
|
||||
//
|
||||
// Note, all transactions with nonces lower than start will also be returned to
|
||||
// prevent getting into and invalid state. This is not something that should ever
|
||||
// happen but better to be self correcting than failing!
|
||||
func (l *txList) Ready(start uint64) types.Transactions {
|
||||
return l.txs.Ready(start)
|
||||
}
|
||||
|
||||
// Len returns the length of the transaction list.
|
||||
func (l *txList) Len() int {
|
||||
return l.txs.Len()
|
||||
}
|
||||
|
||||
// Empty returns whether the list of transactions is empty or not.
|
||||
func (l *txList) Empty() bool {
|
||||
return l.Len() == 0
|
||||
}
|
||||
|
||||
// Flatten creates a nonce-sorted slice of transactions based on the loosely
|
||||
// sorted internal representation. The result of the sorting is cached in case
|
||||
// it's requested again before any modifications are made to the contents.
|
||||
func (l *txList) Flatten() types.Transactions {
|
||||
return l.txs.Flatten()
|
||||
}
|
52
core/tx_list_test.go
Normal file
52
core/tx_list_test.go
Normal file
@ -0,0 +1,52 @@
|
||||
// Copyright 2016 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package core
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
)
|
||||
|
||||
// Tests that transactions can be added to strict lists and list contents and
|
||||
// nonce boundaries are correctly maintained.
|
||||
func TestStrictTxListAdd(t *testing.T) {
|
||||
// Generate a list of transactions to insert
|
||||
key, _ := crypto.GenerateKey()
|
||||
|
||||
txs := make(types.Transactions, 1024)
|
||||
for i := 0; i < len(txs); i++ {
|
||||
txs[i] = transaction(uint64(i), new(big.Int), key)
|
||||
}
|
||||
// Insert the transactions in a random order
|
||||
list := newTxList(true)
|
||||
for _, v := range rand.Perm(len(txs)) {
|
||||
list.Add(txs[v])
|
||||
}
|
||||
// Verify internal state
|
||||
if len(list.txs.items) != len(txs) {
|
||||
t.Errorf("transaction count mismatch: have %d, want %d", len(list.txs.items), len(txs))
|
||||
}
|
||||
for i, tx := range txs {
|
||||
if list.txs.items[tx.Nonce()] != tx {
|
||||
t.Errorf("item %d: transaction mismatch: have %v, want %v", i, list.txs.items[tx.Nonce()], tx)
|
||||
}
|
||||
}
|
||||
}
|
627
core/tx_pool.go
627
core/tx_pool.go
@ -45,8 +45,11 @@ var (
|
||||
ErrNegativeValue = errors.New("Negative value")
|
||||
)
|
||||
|
||||
const (
|
||||
maxQueued = 64 // max limit of queued txs per address
|
||||
var (
|
||||
maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
|
||||
maxQueuedInTotal = uint64(65536) // Max limit of queued transactions from all accounts
|
||||
maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
|
||||
evictionInterval = time.Minute // Time interval to check for evictable transactions
|
||||
)
|
||||
|
||||
type stateFn func() (*state.StateDB, error)
|
||||
@ -68,10 +71,14 @@ type TxPool struct {
|
||||
events event.Subscription
|
||||
localTx *txSet
|
||||
mu sync.RWMutex
|
||||
pending map[common.Hash]*types.Transaction // processable transactions
|
||||
queue map[common.Address]map[common.Hash]*types.Transaction
|
||||
|
||||
wg sync.WaitGroup // for shutdown sync
|
||||
pending map[common.Address]*txList // All currently processable transactions
|
||||
queue map[common.Address]*txList // Queued but non-processable transactions
|
||||
all map[common.Hash]*types.Transaction // All transactions to allow lookups
|
||||
beats map[common.Address]time.Time // Last heartbeat from each known account
|
||||
|
||||
wg sync.WaitGroup // for shutdown sync
|
||||
quit chan struct{}
|
||||
|
||||
homestead bool
|
||||
}
|
||||
@ -79,8 +86,10 @@ type TxPool struct {
|
||||
func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
|
||||
pool := &TxPool{
|
||||
config: config,
|
||||
pending: make(map[common.Hash]*types.Transaction),
|
||||
queue: make(map[common.Address]map[common.Hash]*types.Transaction),
|
||||
pending: make(map[common.Address]*txList),
|
||||
queue: make(map[common.Address]*txList),
|
||||
all: make(map[common.Hash]*types.Transaction),
|
||||
beats: make(map[common.Address]time.Time),
|
||||
eventMux: eventMux,
|
||||
currentState: currentStateFn,
|
||||
gasLimit: gasLimitFn,
|
||||
@ -88,10 +97,12 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
|
||||
pendingState: nil,
|
||||
localTx: newTxSet(),
|
||||
events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
|
||||
pool.wg.Add(1)
|
||||
pool.wg.Add(2)
|
||||
go pool.eventLoop()
|
||||
go pool.expirationLoop()
|
||||
|
||||
return pool
|
||||
}
|
||||
@ -117,7 +128,7 @@ func (pool *TxPool) eventLoop() {
|
||||
pool.minGasPrice = ev.Price
|
||||
pool.mu.Unlock()
|
||||
case RemovedTransactionEvent:
|
||||
pool.AddTransactions(ev.Txs)
|
||||
pool.AddBatch(ev.Txs)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -125,12 +136,12 @@ func (pool *TxPool) eventLoop() {
|
||||
func (pool *TxPool) resetState() {
|
||||
currentState, err := pool.currentState()
|
||||
if err != nil {
|
||||
glog.V(logger.Info).Infoln("failed to get current state: %v", err)
|
||||
glog.V(logger.Error).Infof("Failed to get current state: %v", err)
|
||||
return
|
||||
}
|
||||
managedState := state.ManageState(currentState)
|
||||
if err != nil {
|
||||
glog.V(logger.Info).Infoln("failed to get managed state: %v", err)
|
||||
glog.V(logger.Error).Infof("Failed to get managed state: %v", err)
|
||||
return
|
||||
}
|
||||
pool.pendingState = managedState
|
||||
@ -139,26 +150,21 @@ func (pool *TxPool) resetState() {
|
||||
// any transactions that have been included in the block or
|
||||
// have been invalidated because of another transaction (e.g.
|
||||
// higher gas price)
|
||||
pool.validatePool()
|
||||
pool.demoteUnexecutables()
|
||||
|
||||
// Loop over the pending transactions and base the nonce of the new
|
||||
// pending transaction set.
|
||||
for _, tx := range pool.pending {
|
||||
if addr, err := tx.From(); err == nil {
|
||||
// Set the nonce. Transaction nonce can never be lower
|
||||
// than the state nonce; validatePool took care of that.
|
||||
if pool.pendingState.GetNonce(addr) <= tx.Nonce() {
|
||||
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
|
||||
}
|
||||
}
|
||||
// Update all accounts to the latest known pending nonce
|
||||
for addr, list := range pool.pending {
|
||||
txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
|
||||
pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
|
||||
}
|
||||
// Check the queue and move transactions over to the pending if possible
|
||||
// or remove those that have become invalid
|
||||
pool.checkQueue()
|
||||
pool.promoteExecutables()
|
||||
}
|
||||
|
||||
func (pool *TxPool) Stop() {
|
||||
pool.events.Unsubscribe()
|
||||
close(pool.quit)
|
||||
pool.wg.Wait()
|
||||
glog.V(logger.Info).Infoln("Transaction pool stopped")
|
||||
}
|
||||
@ -170,47 +176,58 @@ func (pool *TxPool) State() *state.ManagedState {
|
||||
return pool.pendingState
|
||||
}
|
||||
|
||||
// Stats retrieves the current pool stats, namely the number of pending and the
|
||||
// number of queued (non-executable) transactions.
|
||||
func (pool *TxPool) Stats() (pending int, queued int) {
|
||||
pool.mu.RLock()
|
||||
defer pool.mu.RUnlock()
|
||||
|
||||
pending = len(pool.pending)
|
||||
for _, txs := range pool.queue {
|
||||
queued += len(txs)
|
||||
for _, list := range pool.pending {
|
||||
pending += list.Len()
|
||||
}
|
||||
for _, list := range pool.queue {
|
||||
queued += list.Len()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Content retrieves the data content of the transaction pool, returning all the
|
||||
// pending as well as queued transactions, grouped by account and nonce.
|
||||
func (pool *TxPool) Content() (map[common.Address]map[uint64][]*types.Transaction, map[common.Address]map[uint64][]*types.Transaction) {
|
||||
// pending as well as queued transactions, grouped by account and sorted by nonce.
|
||||
func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
|
||||
pool.mu.RLock()
|
||||
defer pool.mu.RUnlock()
|
||||
|
||||
// Retrieve all the pending transactions and sort by account and by nonce
|
||||
pending := make(map[common.Address]map[uint64][]*types.Transaction)
|
||||
for _, tx := range pool.pending {
|
||||
account, _ := tx.From()
|
||||
|
||||
owned, ok := pending[account]
|
||||
if !ok {
|
||||
owned = make(map[uint64][]*types.Transaction)
|
||||
pending[account] = owned
|
||||
}
|
||||
owned[tx.Nonce()] = append(owned[tx.Nonce()], tx)
|
||||
pending := make(map[common.Address]types.Transactions)
|
||||
for addr, list := range pool.pending {
|
||||
pending[addr] = list.Flatten()
|
||||
}
|
||||
// Retrieve all the queued transactions and sort by account and by nonce
|
||||
queued := make(map[common.Address]map[uint64][]*types.Transaction)
|
||||
for account, txs := range pool.queue {
|
||||
owned := make(map[uint64][]*types.Transaction)
|
||||
for _, tx := range txs {
|
||||
owned[tx.Nonce()] = append(owned[tx.Nonce()], tx)
|
||||
}
|
||||
queued[account] = owned
|
||||
queued := make(map[common.Address]types.Transactions)
|
||||
for addr, list := range pool.queue {
|
||||
queued[addr] = list.Flatten()
|
||||
}
|
||||
return pending, queued
|
||||
}
|
||||
|
||||
// Pending retrieves all currently processable transactions, groupped by origin
|
||||
// account and sorted by nonce. The returned transaction set is a copy and can be
|
||||
// freely modified by calling code.
|
||||
func (pool *TxPool) Pending() map[common.Address]types.Transactions {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
// check queue first
|
||||
pool.promoteExecutables()
|
||||
|
||||
// invalidate any txs
|
||||
pool.demoteUnexecutables()
|
||||
|
||||
pending := make(map[common.Address]types.Transactions)
|
||||
for addr, list := range pool.pending {
|
||||
pending[addr] = list.Flatten()
|
||||
}
|
||||
return pending
|
||||
}
|
||||
|
||||
// SetLocal marks a transaction as local, skipping gas price
|
||||
// check against local miner minimum in the future
|
||||
func (pool *TxPool) SetLocal(tx *types.Transaction) {
|
||||
@ -276,312 +293,348 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// validate and queue transactions.
|
||||
func (self *TxPool) add(tx *types.Transaction) error {
|
||||
// add validates a transaction and inserts it into the non-executable queue for
|
||||
// later pending promotion and execution.
|
||||
func (pool *TxPool) add(tx *types.Transaction) error {
|
||||
// If the transaction is alreayd known, discard it
|
||||
hash := tx.Hash()
|
||||
|
||||
if self.pending[hash] != nil {
|
||||
return fmt.Errorf("Known transaction (%x)", hash[:4])
|
||||
if pool.all[hash] != nil {
|
||||
return fmt.Errorf("Known transaction: %x", hash[:4])
|
||||
}
|
||||
err := self.validateTx(tx)
|
||||
if err != nil {
|
||||
// Otherwise ensure basic validation passes and queue it up
|
||||
if err := pool.validateTx(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
self.queueTx(hash, tx)
|
||||
pool.enqueueTx(hash, tx)
|
||||
|
||||
// Print a log message if low enough level is set
|
||||
if glog.V(logger.Debug) {
|
||||
var toname string
|
||||
rcpt := "[NEW_CONTRACT]"
|
||||
if to := tx.To(); to != nil {
|
||||
toname = common.Bytes2Hex(to[:4])
|
||||
} else {
|
||||
toname = "[NEW_CONTRACT]"
|
||||
rcpt = common.Bytes2Hex(to[:4])
|
||||
}
|
||||
// we can ignore the error here because From is
|
||||
// verified in ValidateTransaction.
|
||||
f, _ := tx.From()
|
||||
from := common.Bytes2Hex(f[:4])
|
||||
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
|
||||
from, _ := tx.From() // from already verified during tx validation
|
||||
glog.Infof("(t) 0x%x => %s (%v) %x\n", from[:4], rcpt, tx.Value, hash)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// queueTx will queue an unknown transaction
|
||||
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
|
||||
// enqueueTx inserts a new transaction into the non-executable transaction queue.
|
||||
//
|
||||
// Note, this method assumes the pool lock is held!
|
||||
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) {
|
||||
// Try to insert the transaction into the future queue
|
||||
from, _ := tx.From() // already validated
|
||||
if self.queue[from] == nil {
|
||||
self.queue[from] = make(map[common.Hash]*types.Transaction)
|
||||
if pool.queue[from] == nil {
|
||||
pool.queue[from] = newTxList(false)
|
||||
}
|
||||
self.queue[from][hash] = tx
|
||||
inserted, old := pool.queue[from].Add(tx)
|
||||
if !inserted {
|
||||
return // An older transaction was better, discard this
|
||||
}
|
||||
// Discard any previous transaction and mark this
|
||||
if old != nil {
|
||||
delete(pool.all, old.Hash())
|
||||
}
|
||||
pool.all[hash] = tx
|
||||
}
|
||||
|
||||
// addTx will add a transaction to the pending (processable queue) list of transactions
|
||||
func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
|
||||
// init delayed since tx pool could have been started before any state sync
|
||||
// promoteTx adds a transaction to the pending (processable) list of transactions.
|
||||
//
|
||||
// Note, this method assumes the pool lock is held!
|
||||
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) {
|
||||
// Init delayed since tx pool could have been started before any state sync
|
||||
if pool.pendingState == nil {
|
||||
pool.resetState()
|
||||
}
|
||||
|
||||
if _, ok := pool.pending[hash]; !ok {
|
||||
pool.pending[hash] = tx
|
||||
|
||||
// Increment the nonce on the pending state. This can only happen if
|
||||
// the nonce is +1 to the previous one.
|
||||
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
|
||||
// Notify the subscribers. This event is posted in a goroutine
|
||||
// because it's possible that somewhere during the post "Remove transaction"
|
||||
// gets called which will then wait for the global tx pool lock and deadlock.
|
||||
go pool.eventMux.Post(TxPreEvent{tx})
|
||||
// Try to insert the transaction into the pending queue
|
||||
if pool.pending[addr] == nil {
|
||||
pool.pending[addr] = newTxList(true)
|
||||
}
|
||||
list := pool.pending[addr]
|
||||
|
||||
inserted, old := list.Add(tx)
|
||||
if !inserted {
|
||||
// An older transaction was better, discard this
|
||||
delete(pool.all, hash)
|
||||
return
|
||||
}
|
||||
// Otherwise discard any previous transaction and mark this
|
||||
if old != nil {
|
||||
delete(pool.all, old.Hash())
|
||||
}
|
||||
pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests)
|
||||
|
||||
// Set the potentially new pending nonce and notify any subsystems of the new tx
|
||||
pool.beats[addr] = time.Now()
|
||||
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
|
||||
go pool.eventMux.Post(TxPreEvent{tx})
|
||||
}
|
||||
|
||||
// Add queues a single transaction in the pool if it is valid.
|
||||
func (self *TxPool) Add(tx *types.Transaction) error {
|
||||
self.mu.Lock()
|
||||
defer self.mu.Unlock()
|
||||
|
||||
if err := self.add(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
self.checkQueue()
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddTransactions attempts to queue all valid transactions in txs.
|
||||
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
|
||||
self.mu.Lock()
|
||||
defer self.mu.Unlock()
|
||||
|
||||
for _, tx := range txs {
|
||||
if err := self.add(tx); err != nil {
|
||||
glog.V(logger.Debug).Infoln("tx error:", err)
|
||||
} else {
|
||||
h := tx.Hash()
|
||||
glog.V(logger.Debug).Infof("tx %x\n", h[:4])
|
||||
}
|
||||
}
|
||||
|
||||
// check and validate the queue
|
||||
self.checkQueue()
|
||||
}
|
||||
|
||||
// GetTransaction returns a transaction if it is contained in the pool
|
||||
// and nil otherwise.
|
||||
func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
|
||||
tp.mu.RLock()
|
||||
defer tp.mu.RUnlock()
|
||||
|
||||
// check the txs first
|
||||
if tx, ok := tp.pending[hash]; ok {
|
||||
return tx
|
||||
}
|
||||
// check queue
|
||||
for _, txs := range tp.queue {
|
||||
if tx, ok := txs[hash]; ok {
|
||||
return tx
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetTransactions returns all currently processable transactions.
|
||||
// The returned slice may be modified by the caller.
|
||||
func (self *TxPool) GetTransactions() (txs types.Transactions) {
|
||||
self.mu.Lock()
|
||||
defer self.mu.Unlock()
|
||||
|
||||
// check queue first
|
||||
self.checkQueue()
|
||||
// invalidate any txs
|
||||
self.validatePool()
|
||||
|
||||
txs = make(types.Transactions, len(self.pending))
|
||||
i := 0
|
||||
for _, tx := range self.pending {
|
||||
txs[i] = tx
|
||||
i++
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
// GetQueuedTransactions returns all non-processable transactions.
|
||||
func (self *TxPool) GetQueuedTransactions() types.Transactions {
|
||||
self.mu.RLock()
|
||||
defer self.mu.RUnlock()
|
||||
|
||||
var ret types.Transactions
|
||||
for _, txs := range self.queue {
|
||||
for _, tx := range txs {
|
||||
ret = append(ret, tx)
|
||||
}
|
||||
}
|
||||
sort.Sort(types.TxByNonce(ret))
|
||||
return ret
|
||||
}
|
||||
|
||||
// RemoveTransactions removes all given transactions from the pool.
|
||||
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
|
||||
self.mu.Lock()
|
||||
defer self.mu.Unlock()
|
||||
for _, tx := range txs {
|
||||
self.removeTx(tx.Hash())
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveTx removes the transaction with the given hash from the pool.
|
||||
func (pool *TxPool) RemoveTx(hash common.Hash) {
|
||||
func (pool *TxPool) Add(tx *types.Transaction) error {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
if err := pool.add(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
pool.promoteExecutables()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddBatch attempts to queue a batch of transactions.
|
||||
func (pool *TxPool) AddBatch(txs []*types.Transaction) {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
for _, tx := range txs {
|
||||
if err := pool.add(tx); err != nil {
|
||||
glog.V(logger.Debug).Infoln("tx error:", err)
|
||||
}
|
||||
}
|
||||
pool.promoteExecutables()
|
||||
}
|
||||
|
||||
// Get returns a transaction if it is contained in the pool
|
||||
// and nil otherwise.
|
||||
func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
|
||||
pool.mu.RLock()
|
||||
defer pool.mu.RUnlock()
|
||||
|
||||
return pool.all[hash]
|
||||
}
|
||||
|
||||
// Remove removes the transaction with the given hash from the pool.
|
||||
func (pool *TxPool) Remove(hash common.Hash) {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
pool.removeTx(hash)
|
||||
}
|
||||
|
||||
// RemoveBatch removes all given transactions from the pool.
|
||||
func (pool *TxPool) RemoveBatch(txs types.Transactions) {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
for _, tx := range txs {
|
||||
pool.removeTx(tx.Hash())
|
||||
}
|
||||
}
|
||||
|
||||
// removeTx removes a single transaction from the queue, moving all subsequent
|
||||
// transactions back to the future queue.
|
||||
func (pool *TxPool) removeTx(hash common.Hash) {
|
||||
// delete from pending pool
|
||||
delete(pool.pending, hash)
|
||||
// delete from queue
|
||||
for address, txs := range pool.queue {
|
||||
if _, ok := txs[hash]; ok {
|
||||
if len(txs) == 1 {
|
||||
// if only one tx, remove entire address entry.
|
||||
delete(pool.queue, address)
|
||||
// Fetch the transaction we wish to delete
|
||||
tx, ok := pool.all[hash]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
addr, _ := tx.From() // already validated during insertion
|
||||
|
||||
// Remove it from the list of known transactions
|
||||
delete(pool.all, hash)
|
||||
|
||||
// Remove the transaction from the pending lists and reset the account nonce
|
||||
if pending := pool.pending[addr]; pending != nil {
|
||||
if removed, invalids := pending.Remove(tx); removed {
|
||||
// If no more transactions are left, remove the list
|
||||
if pending.Empty() {
|
||||
delete(pool.pending, addr)
|
||||
delete(pool.beats, addr)
|
||||
} else {
|
||||
delete(txs, hash)
|
||||
// Otherwise postpone any invalidated transactions
|
||||
for _, tx := range invalids {
|
||||
pool.enqueueTx(tx.Hash(), tx)
|
||||
}
|
||||
}
|
||||
break
|
||||
// Update the account nonce if needed
|
||||
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
|
||||
pool.pendingState.SetNonce(addr, tx.Nonce())
|
||||
}
|
||||
}
|
||||
}
|
||||
// Transaction is in the future queue
|
||||
if future := pool.queue[addr]; future != nil {
|
||||
future.Remove(tx)
|
||||
if future.Empty() {
|
||||
delete(pool.queue, addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkQueue moves transactions that have become processable to main pool.
|
||||
func (pool *TxPool) checkQueue() {
|
||||
// init delayed since tx pool could have been started before any state sync
|
||||
// promoteExecutables moves transactions that have become processable from the
|
||||
// future queue to the set of pending transactions. During this process, all
|
||||
// invalidated transactions (low nonce, low balance) are deleted.
|
||||
func (pool *TxPool) promoteExecutables() {
|
||||
// Init delayed since tx pool could have been started before any state sync
|
||||
if pool.pendingState == nil {
|
||||
pool.resetState()
|
||||
}
|
||||
// Retrieve the current state to allow nonce and balance checking
|
||||
state, err := pool.currentState()
|
||||
if err != nil {
|
||||
glog.Errorf("Could not get current state: %v", err)
|
||||
return
|
||||
}
|
||||
// Iterate over all accounts and promote any executable transactions
|
||||
queued := uint64(0)
|
||||
|
||||
var promote txQueue
|
||||
for address, txs := range pool.queue {
|
||||
currentState, err := pool.currentState()
|
||||
if err != nil {
|
||||
glog.Errorf("could not get current state: %v", err)
|
||||
return
|
||||
for addr, list := range pool.queue {
|
||||
// Drop all transactions that are deemed too old (low nonce)
|
||||
for _, tx := range list.Forward(state.GetNonce(addr)) {
|
||||
if glog.V(logger.Core) {
|
||||
glog.Infof("Removed old queued transaction: %v", tx)
|
||||
}
|
||||
delete(pool.all, tx.Hash())
|
||||
}
|
||||
balance := currentState.GetBalance(address)
|
||||
// Drop all transactions that are too costly (low balance)
|
||||
drops, _ := list.Filter(state.GetBalance(addr))
|
||||
for _, tx := range drops {
|
||||
if glog.V(logger.Core) {
|
||||
glog.Infof("Removed unpayable queued transaction: %v", tx)
|
||||
}
|
||||
delete(pool.all, tx.Hash())
|
||||
}
|
||||
// Gather all executable transactions and promote them
|
||||
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
|
||||
if glog.V(logger.Core) {
|
||||
glog.Infof("Promoting queued transaction: %v", tx)
|
||||
}
|
||||
pool.promoteTx(addr, tx.Hash(), tx)
|
||||
}
|
||||
// Drop all transactions over the allowed limit
|
||||
for _, tx := range list.Cap(int(maxQueuedPerAccount)) {
|
||||
if glog.V(logger.Core) {
|
||||
glog.Infof("Removed cap-exceeding queued transaction: %v", tx)
|
||||
}
|
||||
delete(pool.all, tx.Hash())
|
||||
}
|
||||
queued += uint64(list.Len())
|
||||
|
||||
var (
|
||||
guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state)
|
||||
trueNonce = currentState.GetNonce(address) // nonce known by the last state
|
||||
)
|
||||
promote = promote[:0]
|
||||
for hash, tx := range txs {
|
||||
// Drop processed or out of fund transactions
|
||||
if tx.Nonce() < trueNonce || balance.Cmp(tx.Cost()) < 0 {
|
||||
if glog.V(logger.Core) {
|
||||
glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx)
|
||||
// Delete the entire queue entry if it became empty.
|
||||
if list.Empty() {
|
||||
delete(pool.queue, addr)
|
||||
}
|
||||
}
|
||||
// If we've queued more transactions than the hard limit, drop oldest ones
|
||||
if queued > maxQueuedInTotal {
|
||||
// Sort all accounts with queued transactions by heartbeat
|
||||
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
|
||||
for addr, _ := range pool.queue {
|
||||
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
|
||||
}
|
||||
sort.Sort(addresses)
|
||||
|
||||
// Drop transactions until the total is below the limit
|
||||
for drop := queued - maxQueuedInTotal; drop > 0; {
|
||||
addr := addresses[len(addresses)-1]
|
||||
list := pool.queue[addr.address]
|
||||
|
||||
addresses = addresses[:len(addresses)-1]
|
||||
|
||||
// Drop all transactions if they are less than the overflow
|
||||
if size := uint64(list.Len()); size <= drop {
|
||||
for _, tx := range list.Flatten() {
|
||||
pool.removeTx(tx.Hash())
|
||||
}
|
||||
delete(txs, hash)
|
||||
drop -= size
|
||||
continue
|
||||
}
|
||||
// Collect the remaining transactions for the next pass.
|
||||
promote = append(promote, txQueueEntry{hash, address, tx})
|
||||
}
|
||||
// Find the next consecutive nonce range starting at the current account nonce,
|
||||
// pushing the guessed nonce forward if we add consecutive transactions.
|
||||
sort.Sort(promote)
|
||||
for i, entry := range promote {
|
||||
// If we reached a gap in the nonces, enforce transaction limit and stop
|
||||
if entry.Nonce() > guessedNonce {
|
||||
if len(promote)-i > maxQueued {
|
||||
if glog.V(logger.Debug) {
|
||||
glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.hash[:]))
|
||||
}
|
||||
for _, drop := range promote[i+maxQueued:] {
|
||||
delete(txs, drop.hash)
|
||||
}
|
||||
}
|
||||
break
|
||||
// Otherwise drop only last few transactions
|
||||
txs := list.Flatten()
|
||||
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
|
||||
pool.removeTx(txs[i].Hash())
|
||||
drop--
|
||||
}
|
||||
// Otherwise promote the transaction and move the guess nonce if needed
|
||||
pool.addTx(entry.hash, address, entry.Transaction)
|
||||
delete(txs, entry.hash)
|
||||
|
||||
if entry.Nonce() == guessedNonce {
|
||||
guessedNonce++
|
||||
}
|
||||
}
|
||||
// Delete the entire queue entry if it became empty.
|
||||
if len(txs) == 0 {
|
||||
delete(pool.queue, address)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// validatePool removes invalid and processed transactions from the main pool.
|
||||
// If a transaction is removed for being invalid (e.g. out of funds), all sub-
|
||||
// sequent (Still valid) transactions are moved back into the future queue. This
|
||||
// is important to prevent a drained account from DOSing the network with non
|
||||
// executable transactions.
|
||||
func (pool *TxPool) validatePool() {
|
||||
// demoteUnexecutables removes invalid and processed transactions from the pools
|
||||
// executable/pending queue and any subsequent transactions that become unexecutable
|
||||
// are moved back into the future queue.
|
||||
func (pool *TxPool) demoteUnexecutables() {
|
||||
// Retrieve the current state to allow nonce and balance checking
|
||||
state, err := pool.currentState()
|
||||
if err != nil {
|
||||
glog.V(logger.Info).Infoln("failed to get current state: %v", err)
|
||||
return
|
||||
}
|
||||
balanceCache := make(map[common.Address]*big.Int)
|
||||
// Iterate over all accounts and demote any non-executable transactions
|
||||
for addr, list := range pool.pending {
|
||||
nonce := state.GetNonce(addr)
|
||||
|
||||
// Clean up the pending pool, accumulating invalid nonces
|
||||
gaps := make(map[common.Address]uint64)
|
||||
|
||||
for hash, tx := range pool.pending {
|
||||
sender, _ := tx.From() // err already checked
|
||||
|
||||
// Perform light nonce and balance validation
|
||||
balance := balanceCache[sender]
|
||||
if balance == nil {
|
||||
balance = state.GetBalance(sender)
|
||||
balanceCache[sender] = balance
|
||||
}
|
||||
if past := state.GetNonce(sender) > tx.Nonce(); past || balance.Cmp(tx.Cost()) < 0 {
|
||||
// Remove an already past it invalidated transaction
|
||||
// Drop all transactions that are deemed too old (low nonce)
|
||||
for _, tx := range list.Forward(nonce) {
|
||||
if glog.V(logger.Core) {
|
||||
glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx)
|
||||
}
|
||||
delete(pool.pending, hash)
|
||||
|
||||
// Track the smallest invalid nonce to postpone subsequent transactions
|
||||
if !past {
|
||||
if prev, ok := gaps[sender]; !ok || tx.Nonce() < prev {
|
||||
gaps[sender] = tx.Nonce()
|
||||
}
|
||||
glog.Infof("Removed old pending transaction: %v", tx)
|
||||
}
|
||||
delete(pool.all, tx.Hash())
|
||||
}
|
||||
}
|
||||
// Move all transactions after a gap back to the future queue
|
||||
if len(gaps) > 0 {
|
||||
for hash, tx := range pool.pending {
|
||||
sender, _ := tx.From()
|
||||
if gap, ok := gaps[sender]; ok && tx.Nonce() >= gap {
|
||||
if glog.V(logger.Core) {
|
||||
glog.Infof("postponed tx (%v) due to introduced gap\n", tx)
|
||||
}
|
||||
pool.queueTx(hash, tx)
|
||||
delete(pool.pending, hash)
|
||||
// Drop all transactions that are too costly (low balance), and queue any invalids back for later
|
||||
drops, invalids := list.Filter(state.GetBalance(addr))
|
||||
for _, tx := range drops {
|
||||
if glog.V(logger.Core) {
|
||||
glog.Infof("Removed unpayable pending transaction: %v", tx)
|
||||
}
|
||||
delete(pool.all, tx.Hash())
|
||||
}
|
||||
for _, tx := range invalids {
|
||||
if glog.V(logger.Core) {
|
||||
glog.Infof("Demoting pending transaction: %v", tx)
|
||||
}
|
||||
pool.enqueueTx(tx.Hash(), tx)
|
||||
}
|
||||
// Delete the entire queue entry if it became empty.
|
||||
if list.Empty() {
|
||||
delete(pool.pending, addr)
|
||||
delete(pool.beats, addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type txQueue []txQueueEntry
|
||||
// expirationLoop is a loop that periodically iterates over all accounts with
|
||||
// queued transactions and drop all that have been inactive for a prolonged amount
|
||||
// of time.
|
||||
func (pool *TxPool) expirationLoop() {
|
||||
defer pool.wg.Done()
|
||||
|
||||
type txQueueEntry struct {
|
||||
hash common.Hash
|
||||
addr common.Address
|
||||
*types.Transaction
|
||||
evict := time.NewTicker(evictionInterval)
|
||||
defer evict.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-evict.C:
|
||||
pool.mu.Lock()
|
||||
for addr := range pool.queue {
|
||||
if time.Since(pool.beats[addr]) > maxQueuedLifetime {
|
||||
for _, tx := range pool.queue[addr].Flatten() {
|
||||
pool.removeTx(tx.Hash())
|
||||
}
|
||||
}
|
||||
}
|
||||
pool.mu.Unlock()
|
||||
|
||||
case <-pool.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q txQueue) Len() int { return len(q) }
|
||||
func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] }
|
||||
func (q txQueue) Less(i, j int) bool { return q[i].Nonce() < q[j].Nonce() }
|
||||
// addressByHeartbeat is an account address tagged with its last activity timestamp.
|
||||
type addressByHeartbeat struct {
|
||||
address common.Address
|
||||
heartbeat time.Time
|
||||
}
|
||||
|
||||
type addresssByHeartbeat []addressByHeartbeat
|
||||
|
||||
func (a addresssByHeartbeat) Len() int { return len(a) }
|
||||
func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
|
||||
func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
// txSet represents a set of transaction hashes in which entries
|
||||
// are automatically dropped after txSetDuration time
|
||||
|
@ -19,7 +19,9 @@ package core
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
@ -38,10 +40,10 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
|
||||
db, _ := ethdb.NewMemDatabase()
|
||||
statedb, _ := state.New(common.Hash{}, db)
|
||||
|
||||
var m event.TypeMux
|
||||
key, _ := crypto.GenerateKey()
|
||||
newPool := NewTxPool(testChainConfig(), &m, func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
|
||||
newPool := NewTxPool(testChainConfig(), new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
|
||||
newPool.resetState()
|
||||
|
||||
return newPool, key
|
||||
}
|
||||
|
||||
@ -91,9 +93,9 @@ func TestTransactionQueue(t *testing.T) {
|
||||
from, _ := tx.From()
|
||||
currentState, _ := pool.currentState()
|
||||
currentState.AddBalance(from, big.NewInt(1000))
|
||||
pool.queueTx(tx.Hash(), tx)
|
||||
pool.enqueueTx(tx.Hash(), tx)
|
||||
|
||||
pool.checkQueue()
|
||||
pool.promoteExecutables()
|
||||
if len(pool.pending) != 1 {
|
||||
t.Error("expected valid txs to be 1 is", len(pool.pending))
|
||||
}
|
||||
@ -101,14 +103,14 @@ func TestTransactionQueue(t *testing.T) {
|
||||
tx = transaction(1, big.NewInt(100), key)
|
||||
from, _ = tx.From()
|
||||
currentState.SetNonce(from, 2)
|
||||
pool.queueTx(tx.Hash(), tx)
|
||||
pool.checkQueue()
|
||||
if _, ok := pool.pending[tx.Hash()]; ok {
|
||||
pool.enqueueTx(tx.Hash(), tx)
|
||||
pool.promoteExecutables()
|
||||
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
|
||||
t.Error("expected transaction to be in tx pool")
|
||||
}
|
||||
|
||||
if len(pool.queue[from]) > 0 {
|
||||
t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
|
||||
if len(pool.queue) > 0 {
|
||||
t.Error("expected transaction queue to be empty. is", len(pool.queue))
|
||||
}
|
||||
|
||||
pool, key = setupTxPool()
|
||||
@ -118,17 +120,17 @@ func TestTransactionQueue(t *testing.T) {
|
||||
from, _ = tx1.From()
|
||||
currentState, _ = pool.currentState()
|
||||
currentState.AddBalance(from, big.NewInt(1000))
|
||||
pool.queueTx(tx1.Hash(), tx1)
|
||||
pool.queueTx(tx2.Hash(), tx2)
|
||||
pool.queueTx(tx3.Hash(), tx3)
|
||||
pool.enqueueTx(tx1.Hash(), tx1)
|
||||
pool.enqueueTx(tx2.Hash(), tx2)
|
||||
pool.enqueueTx(tx3.Hash(), tx3)
|
||||
|
||||
pool.checkQueue()
|
||||
pool.promoteExecutables()
|
||||
|
||||
if len(pool.pending) != 1 {
|
||||
t.Error("expected tx pool to be 1, got", len(pool.pending))
|
||||
}
|
||||
if len(pool.queue[from]) != 2 {
|
||||
t.Error("expected len(queue) == 2, got", len(pool.queue[from]))
|
||||
if pool.queue[from].Len() != 2 {
|
||||
t.Error("expected len(queue) == 2, got", pool.queue[from].Len())
|
||||
}
|
||||
}
|
||||
|
||||
@ -138,24 +140,21 @@ func TestRemoveTx(t *testing.T) {
|
||||
from, _ := tx.From()
|
||||
currentState, _ := pool.currentState()
|
||||
currentState.AddBalance(from, big.NewInt(1))
|
||||
pool.queueTx(tx.Hash(), tx)
|
||||
pool.addTx(tx.Hash(), from, tx)
|
||||
|
||||
pool.enqueueTx(tx.Hash(), tx)
|
||||
pool.promoteTx(from, tx.Hash(), tx)
|
||||
if len(pool.queue) != 1 {
|
||||
t.Error("expected queue to be 1, got", len(pool.queue))
|
||||
}
|
||||
|
||||
if len(pool.pending) != 1 {
|
||||
t.Error("expected txs to be 1, got", len(pool.pending))
|
||||
t.Error("expected pending to be 1, got", len(pool.pending))
|
||||
}
|
||||
|
||||
pool.RemoveTx(tx.Hash())
|
||||
|
||||
pool.Remove(tx.Hash())
|
||||
if len(pool.queue) > 0 {
|
||||
t.Error("expected queue to be 0, got", len(pool.queue))
|
||||
}
|
||||
|
||||
if len(pool.pending) > 0 {
|
||||
t.Error("expected txs to be 0, got", len(pool.pending))
|
||||
t.Error("expected pending to be 0, got", len(pool.pending))
|
||||
}
|
||||
}
|
||||
|
||||
@ -188,7 +187,7 @@ func TestTransactionChainFork(t *testing.T) {
|
||||
if err := pool.add(tx); err != nil {
|
||||
t.Error("didn't expect error", err)
|
||||
}
|
||||
pool.RemoveTransactions([]*types.Transaction{tx})
|
||||
pool.RemoveBatch([]*types.Transaction{tx})
|
||||
|
||||
// reset the pool's internal state
|
||||
resetState()
|
||||
@ -210,18 +209,38 @@ func TestTransactionDoubleNonce(t *testing.T) {
|
||||
}
|
||||
resetState()
|
||||
|
||||
tx := transaction(0, big.NewInt(100000), key)
|
||||
tx2 := transaction(0, big.NewInt(1000000), key)
|
||||
if err := pool.add(tx); err != nil {
|
||||
tx1, _ := types.NewTransaction(0, common.Address{}, big.NewInt(100), big.NewInt(100000), big.NewInt(1), nil).SignECDSA(key)
|
||||
tx2, _ := types.NewTransaction(0, common.Address{}, big.NewInt(100), big.NewInt(1000000), big.NewInt(2), nil).SignECDSA(key)
|
||||
tx3, _ := types.NewTransaction(0, common.Address{}, big.NewInt(100), big.NewInt(1000000), big.NewInt(1), nil).SignECDSA(key)
|
||||
|
||||
// Add the first two transaction, ensure higher priced stays only
|
||||
if err := pool.add(tx1); err != nil {
|
||||
t.Error("didn't expect error", err)
|
||||
}
|
||||
if err := pool.add(tx2); err != nil {
|
||||
t.Error("didn't expect error", err)
|
||||
}
|
||||
|
||||
pool.checkQueue()
|
||||
if len(pool.pending) != 2 {
|
||||
t.Error("expected 2 pending txs. Got", len(pool.pending))
|
||||
pool.promoteExecutables()
|
||||
if pool.pending[addr].Len() != 1 {
|
||||
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
|
||||
}
|
||||
if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() {
|
||||
t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash())
|
||||
}
|
||||
// Add the thid transaction and ensure it's not saved (smaller price)
|
||||
if err := pool.add(tx3); err != nil {
|
||||
t.Error("didn't expect error", err)
|
||||
}
|
||||
pool.promoteExecutables()
|
||||
if pool.pending[addr].Len() != 1 {
|
||||
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
|
||||
}
|
||||
if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() {
|
||||
t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash())
|
||||
}
|
||||
// Ensure the total transaction count is correct
|
||||
if len(pool.all) != 1 {
|
||||
t.Error("expected 1 total transactions, got", len(pool.all))
|
||||
}
|
||||
}
|
||||
|
||||
@ -237,8 +256,11 @@ func TestMissingNonce(t *testing.T) {
|
||||
if len(pool.pending) != 0 {
|
||||
t.Error("expected 0 pending transactions, got", len(pool.pending))
|
||||
}
|
||||
if len(pool.queue[addr]) != 1 {
|
||||
t.Error("expected 1 queued transaction, got", len(pool.queue[addr]))
|
||||
if pool.queue[addr].Len() != 1 {
|
||||
t.Error("expected 1 queued transaction, got", pool.queue[addr].Len())
|
||||
}
|
||||
if len(pool.all) != 1 {
|
||||
t.Error("expected 1 total transactions, got", len(pool.all))
|
||||
}
|
||||
}
|
||||
|
||||
@ -270,8 +292,11 @@ func TestRemovedTxEvent(t *testing.T) {
|
||||
currentState.AddBalance(from, big.NewInt(1000000000000))
|
||||
pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}})
|
||||
pool.eventMux.Post(ChainHeadEvent{nil})
|
||||
if len(pool.pending) != 1 {
|
||||
t.Error("expected 1 pending tx, got", len(pool.pending))
|
||||
if pool.pending[from].Len() != 1 {
|
||||
t.Error("expected 1 pending tx, got", pool.pending[from].Len())
|
||||
}
|
||||
if len(pool.all) != 1 {
|
||||
t.Error("expected 1 total transactions, got", len(pool.all))
|
||||
}
|
||||
}
|
||||
|
||||
@ -292,41 +317,50 @@ func TestTransactionDropping(t *testing.T) {
|
||||
tx10 = transaction(10, big.NewInt(100), key)
|
||||
tx11 = transaction(11, big.NewInt(200), key)
|
||||
)
|
||||
pool.addTx(tx0.Hash(), account, tx0)
|
||||
pool.addTx(tx1.Hash(), account, tx1)
|
||||
pool.queueTx(tx10.Hash(), tx10)
|
||||
pool.queueTx(tx11.Hash(), tx11)
|
||||
pool.promoteTx(account, tx0.Hash(), tx0)
|
||||
pool.promoteTx(account, tx1.Hash(), tx1)
|
||||
pool.enqueueTx(tx10.Hash(), tx10)
|
||||
pool.enqueueTx(tx11.Hash(), tx11)
|
||||
|
||||
// Check that pre and post validations leave the pool as is
|
||||
if len(pool.pending) != 2 {
|
||||
t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending), 2)
|
||||
if pool.pending[account].Len() != 2 {
|
||||
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 2)
|
||||
}
|
||||
if len(pool.queue[account]) != 2 {
|
||||
t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue), 2)
|
||||
if pool.queue[account].Len() != 2 {
|
||||
t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 2)
|
||||
}
|
||||
if len(pool.all) != 4 {
|
||||
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4)
|
||||
}
|
||||
pool.resetState()
|
||||
if len(pool.pending) != 2 {
|
||||
t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending), 2)
|
||||
if pool.pending[account].Len() != 2 {
|
||||
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 2)
|
||||
}
|
||||
if len(pool.queue[account]) != 2 {
|
||||
t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue), 2)
|
||||
if pool.queue[account].Len() != 2 {
|
||||
t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 2)
|
||||
}
|
||||
if len(pool.all) != 4 {
|
||||
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4)
|
||||
}
|
||||
// Reduce the balance of the account, and check that invalidated transactions are dropped
|
||||
state.AddBalance(account, big.NewInt(-750))
|
||||
pool.resetState()
|
||||
|
||||
if _, ok := pool.pending[tx0.Hash()]; !ok {
|
||||
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
|
||||
t.Errorf("funded pending transaction missing: %v", tx0)
|
||||
}
|
||||
if _, ok := pool.pending[tx1.Hash()]; ok {
|
||||
if _, ok := pool.pending[account].txs.items[tx1.Nonce()]; ok {
|
||||
t.Errorf("out-of-fund pending transaction present: %v", tx1)
|
||||
}
|
||||
if _, ok := pool.queue[account][tx10.Hash()]; !ok {
|
||||
if _, ok := pool.queue[account].txs.items[tx10.Nonce()]; !ok {
|
||||
t.Errorf("funded queued transaction missing: %v", tx10)
|
||||
}
|
||||
if _, ok := pool.queue[account][tx11.Hash()]; ok {
|
||||
if _, ok := pool.queue[account].txs.items[tx11.Nonce()]; ok {
|
||||
t.Errorf("out-of-fund queued transaction present: %v", tx11)
|
||||
}
|
||||
if len(pool.all) != 2 {
|
||||
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 2)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that if a transaction is dropped from the current pending pool (e.g. out
|
||||
@ -349,55 +383,64 @@ func TestTransactionPostponing(t *testing.T) {
|
||||
} else {
|
||||
tx = transaction(uint64(i), big.NewInt(500), key)
|
||||
}
|
||||
pool.addTx(tx.Hash(), account, tx)
|
||||
pool.promoteTx(account, tx.Hash(), tx)
|
||||
txns = append(txns, tx)
|
||||
}
|
||||
// Check that pre and post validations leave the pool as is
|
||||
if len(pool.pending) != len(txns) {
|
||||
t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending), len(txns))
|
||||
if pool.pending[account].Len() != len(txns) {
|
||||
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), len(txns))
|
||||
}
|
||||
if len(pool.queue[account]) != 0 {
|
||||
t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue), 0)
|
||||
if len(pool.queue) != 0 {
|
||||
t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 0)
|
||||
}
|
||||
if len(pool.all) != len(txns) {
|
||||
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns))
|
||||
}
|
||||
pool.resetState()
|
||||
if len(pool.pending) != len(txns) {
|
||||
t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending), len(txns))
|
||||
if pool.pending[account].Len() != len(txns) {
|
||||
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), len(txns))
|
||||
}
|
||||
if len(pool.queue[account]) != 0 {
|
||||
t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue), 0)
|
||||
if len(pool.queue) != 0 {
|
||||
t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 0)
|
||||
}
|
||||
if len(pool.all) != len(txns) {
|
||||
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns))
|
||||
}
|
||||
// Reduce the balance of the account, and check that transactions are reorganised
|
||||
state.AddBalance(account, big.NewInt(-750))
|
||||
pool.resetState()
|
||||
|
||||
if _, ok := pool.pending[txns[0].Hash()]; !ok {
|
||||
if _, ok := pool.pending[account].txs.items[txns[0].Nonce()]; !ok {
|
||||
t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txns[0])
|
||||
}
|
||||
if _, ok := pool.queue[account][txns[0].Hash()]; ok {
|
||||
if _, ok := pool.queue[account].txs.items[txns[0].Nonce()]; ok {
|
||||
t.Errorf("tx %d: valid and funded transaction present in future queue: %v", 0, txns[0])
|
||||
}
|
||||
for i, tx := range txns[1:] {
|
||||
if i%2 == 1 {
|
||||
if _, ok := pool.pending[tx.Hash()]; ok {
|
||||
if _, ok := pool.pending[account].txs.items[tx.Nonce()]; ok {
|
||||
t.Errorf("tx %d: valid but future transaction present in pending pool: %v", i+1, tx)
|
||||
}
|
||||
if _, ok := pool.queue[account][tx.Hash()]; !ok {
|
||||
if _, ok := pool.queue[account].txs.items[tx.Nonce()]; !ok {
|
||||
t.Errorf("tx %d: valid but future transaction missing from future queue: %v", i+1, tx)
|
||||
}
|
||||
} else {
|
||||
if _, ok := pool.pending[tx.Hash()]; ok {
|
||||
if _, ok := pool.pending[account].txs.items[tx.Nonce()]; ok {
|
||||
t.Errorf("tx %d: out-of-fund transaction present in pending pool: %v", i+1, tx)
|
||||
}
|
||||
if _, ok := pool.queue[account][tx.Hash()]; ok {
|
||||
if _, ok := pool.queue[account].txs.items[tx.Nonce()]; ok {
|
||||
t.Errorf("tx %d: out-of-fund transaction present in future queue: %v", i+1, tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(pool.all) != len(txns)/2 {
|
||||
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns)/2)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that if the transaction count belonging to a single account goes above
|
||||
// some threshold, the higher transactions are dropped to prevent DOS attacks.
|
||||
func TestTransactionQueueLimiting(t *testing.T) {
|
||||
func TestTransactionQueueAccountLimiting(t *testing.T) {
|
||||
// Create a test account and fund it
|
||||
pool, key := setupTxPool()
|
||||
account, _ := transaction(0, big.NewInt(0), key).From()
|
||||
@ -406,23 +449,104 @@ func TestTransactionQueueLimiting(t *testing.T) {
|
||||
state.AddBalance(account, big.NewInt(1000000))
|
||||
|
||||
// Keep queuing up transactions and make sure all above a limit are dropped
|
||||
for i := uint64(1); i <= maxQueued+5; i++ {
|
||||
for i := uint64(1); i <= maxQueuedPerAccount+5; i++ {
|
||||
if err := pool.Add(transaction(i, big.NewInt(100000), key)); err != nil {
|
||||
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
|
||||
}
|
||||
if len(pool.pending) != 0 {
|
||||
t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, len(pool.pending), 0)
|
||||
}
|
||||
if i <= maxQueued {
|
||||
if len(pool.queue[account]) != int(i) {
|
||||
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, len(pool.queue[account]), i)
|
||||
if i <= maxQueuedPerAccount {
|
||||
if pool.queue[account].Len() != int(i) {
|
||||
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), i)
|
||||
}
|
||||
} else {
|
||||
if len(pool.queue[account]) != maxQueued {
|
||||
t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, len(pool.queue[account]), maxQueued)
|
||||
if pool.queue[account].Len() != int(maxQueuedPerAccount) {
|
||||
t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, pool.queue[account].Len(), maxQueuedPerAccount)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(pool.all) != int(maxQueuedPerAccount) {
|
||||
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), maxQueuedPerAccount)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that if the transaction count belonging to multiple accounts go above
|
||||
// some threshold, the higher transactions are dropped to prevent DOS attacks.
|
||||
func TestTransactionQueueGlobalLimiting(t *testing.T) {
|
||||
// Reduce the queue limits to shorten test time
|
||||
defer func(old uint64) { maxQueuedInTotal = old }(maxQueuedInTotal)
|
||||
maxQueuedInTotal = maxQueuedPerAccount * 3
|
||||
|
||||
// Create the pool to test the limit enforcement with
|
||||
db, _ := ethdb.NewMemDatabase()
|
||||
statedb, _ := state.New(common.Hash{}, db)
|
||||
|
||||
pool := NewTxPool(testChainConfig(), new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
|
||||
pool.resetState()
|
||||
|
||||
// Create a number of test accounts and fund them
|
||||
state, _ := pool.currentState()
|
||||
|
||||
keys := make([]*ecdsa.PrivateKey, 5)
|
||||
for i := 0; i < len(keys); i++ {
|
||||
keys[i], _ = crypto.GenerateKey()
|
||||
state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
|
||||
}
|
||||
// Generate and queue a batch of transactions
|
||||
nonces := make(map[common.Address]uint64)
|
||||
|
||||
txs := make(types.Transactions, 0, 3*maxQueuedInTotal)
|
||||
for len(txs) < cap(txs) {
|
||||
key := keys[rand.Intn(len(keys))]
|
||||
addr := crypto.PubkeyToAddress(key.PublicKey)
|
||||
|
||||
txs = append(txs, transaction(nonces[addr]+1, big.NewInt(100000), key))
|
||||
nonces[addr]++
|
||||
}
|
||||
// Import the batch and verify that limits have been enforced
|
||||
pool.AddBatch(txs)
|
||||
|
||||
queued := 0
|
||||
for addr, list := range pool.queue {
|
||||
if list.Len() > int(maxQueuedPerAccount) {
|
||||
t.Errorf("addr %x: queued accounts overflown allowance: %d > %d", addr, list.Len(), maxQueuedPerAccount)
|
||||
}
|
||||
queued += list.Len()
|
||||
}
|
||||
if queued > int(maxQueuedInTotal) {
|
||||
t.Fatalf("total transactions overflow allowance: %d > %d", queued, maxQueuedInTotal)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that if an account remains idle for a prolonged amount of time, any
|
||||
// non-executable transactions queued up are dropped to prevent wasting resources
|
||||
// on shuffling them around.
|
||||
func TestTransactionQueueTimeLimiting(t *testing.T) {
|
||||
// Reduce the queue limits to shorten test time
|
||||
defer func(old time.Duration) { maxQueuedLifetime = old }(maxQueuedLifetime)
|
||||
defer func(old time.Duration) { evictionInterval = old }(evictionInterval)
|
||||
maxQueuedLifetime = time.Second
|
||||
evictionInterval = time.Second
|
||||
|
||||
// Create a test account and fund it
|
||||
pool, key := setupTxPool()
|
||||
account, _ := transaction(0, big.NewInt(0), key).From()
|
||||
|
||||
state, _ := pool.currentState()
|
||||
state.AddBalance(account, big.NewInt(1000000))
|
||||
|
||||
// Queue up a batch of transactions
|
||||
for i := uint64(1); i <= maxQueuedPerAccount; i++ {
|
||||
if err := pool.Add(transaction(i, big.NewInt(100000), key)); err != nil {
|
||||
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
|
||||
}
|
||||
}
|
||||
// Wait until at least two expiration cycles hit and make sure the transactions are gone
|
||||
time.Sleep(2 * evictionInterval)
|
||||
if len(pool.queue) > 0 {
|
||||
t.Fatalf("old transactions remained after eviction")
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that even if the transaction count belonging to a single account goes
|
||||
@ -437,17 +561,20 @@ func TestTransactionPendingLimiting(t *testing.T) {
|
||||
state.AddBalance(account, big.NewInt(1000000))
|
||||
|
||||
// Keep queuing up transactions and make sure all above a limit are dropped
|
||||
for i := uint64(0); i < maxQueued+5; i++ {
|
||||
for i := uint64(0); i < maxQueuedPerAccount+5; i++ {
|
||||
if err := pool.Add(transaction(i, big.NewInt(100000), key)); err != nil {
|
||||
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
|
||||
}
|
||||
if len(pool.pending) != int(i)+1 {
|
||||
t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, len(pool.pending), i+1)
|
||||
if pool.pending[account].Len() != int(i)+1 {
|
||||
t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, pool.pending[account].Len(), i+1)
|
||||
}
|
||||
if len(pool.queue[account]) != 0 {
|
||||
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, len(pool.queue[account]), 0)
|
||||
if len(pool.queue) != 0 {
|
||||
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), 0)
|
||||
}
|
||||
}
|
||||
if len(pool.all) != int(maxQueuedPerAccount+5) {
|
||||
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), maxQueuedPerAccount+5)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that the transaction limits are enforced the same way irrelevant whether
|
||||
@ -462,39 +589,42 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
|
||||
state1, _ := pool1.currentState()
|
||||
state1.AddBalance(account1, big.NewInt(1000000))
|
||||
|
||||
for i := uint64(0); i < maxQueued+5; i++ {
|
||||
for i := uint64(0); i < maxQueuedPerAccount+5; i++ {
|
||||
if err := pool1.Add(transaction(origin+i, big.NewInt(100000), key1)); err != nil {
|
||||
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
|
||||
}
|
||||
}
|
||||
// Add a batch of transactions to a pool in one bit batch
|
||||
// Add a batch of transactions to a pool in one big batch
|
||||
pool2, key2 := setupTxPool()
|
||||
account2, _ := transaction(0, big.NewInt(0), key2).From()
|
||||
state2, _ := pool2.currentState()
|
||||
state2.AddBalance(account2, big.NewInt(1000000))
|
||||
|
||||
txns := []*types.Transaction{}
|
||||
for i := uint64(0); i < maxQueued+5; i++ {
|
||||
for i := uint64(0); i < maxQueuedPerAccount+5; i++ {
|
||||
txns = append(txns, transaction(origin+i, big.NewInt(100000), key2))
|
||||
}
|
||||
pool2.AddTransactions(txns)
|
||||
pool2.AddBatch(txns)
|
||||
|
||||
// Ensure the batch optimization honors the same pool mechanics
|
||||
if len(pool1.pending) != len(pool2.pending) {
|
||||
t.Errorf("pending transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.pending), len(pool2.pending))
|
||||
}
|
||||
if len(pool1.queue[account1]) != len(pool2.queue[account2]) {
|
||||
t.Errorf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.queue[account1]), len(pool2.queue[account2]))
|
||||
if len(pool1.queue) != len(pool2.queue) {
|
||||
t.Errorf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.queue), len(pool2.queue))
|
||||
}
|
||||
if len(pool1.all) != len(pool2.all) {
|
||||
t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", len(pool1.all), len(pool2.all))
|
||||
}
|
||||
}
|
||||
|
||||
// Benchmarks the speed of validating the contents of the pending queue of the
|
||||
// transaction pool.
|
||||
func BenchmarkValidatePool100(b *testing.B) { benchmarkValidatePool(b, 100) }
|
||||
func BenchmarkValidatePool1000(b *testing.B) { benchmarkValidatePool(b, 1000) }
|
||||
func BenchmarkValidatePool10000(b *testing.B) { benchmarkValidatePool(b, 10000) }
|
||||
func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) }
|
||||
func BenchmarkPendingDemotion1000(b *testing.B) { benchmarkPendingDemotion(b, 1000) }
|
||||
func BenchmarkPendingDemotion10000(b *testing.B) { benchmarkPendingDemotion(b, 10000) }
|
||||
|
||||
func benchmarkValidatePool(b *testing.B, size int) {
|
||||
func benchmarkPendingDemotion(b *testing.B, size int) {
|
||||
// Add a batch of transactions to a pool one by one
|
||||
pool, key := setupTxPool()
|
||||
account, _ := transaction(0, big.NewInt(0), key).From()
|
||||
@ -503,22 +633,22 @@ func benchmarkValidatePool(b *testing.B, size int) {
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
tx := transaction(uint64(i), big.NewInt(100000), key)
|
||||
pool.addTx(tx.Hash(), account, tx)
|
||||
pool.promoteTx(account, tx.Hash(), tx)
|
||||
}
|
||||
// Benchmark the speed of pool validation
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
pool.validatePool()
|
||||
pool.demoteUnexecutables()
|
||||
}
|
||||
}
|
||||
|
||||
// Benchmarks the speed of scheduling the contents of the future queue of the
|
||||
// transaction pool.
|
||||
func BenchmarkCheckQueue100(b *testing.B) { benchmarkCheckQueue(b, 100) }
|
||||
func BenchmarkCheckQueue1000(b *testing.B) { benchmarkCheckQueue(b, 1000) }
|
||||
func BenchmarkCheckQueue10000(b *testing.B) { benchmarkCheckQueue(b, 10000) }
|
||||
func BenchmarkFuturePromotion100(b *testing.B) { benchmarkFuturePromotion(b, 100) }
|
||||
func BenchmarkFuturePromotion1000(b *testing.B) { benchmarkFuturePromotion(b, 1000) }
|
||||
func BenchmarkFuturePromotion10000(b *testing.B) { benchmarkFuturePromotion(b, 10000) }
|
||||
|
||||
func benchmarkCheckQueue(b *testing.B, size int) {
|
||||
func benchmarkFuturePromotion(b *testing.B, size int) {
|
||||
// Add a batch of transactions to a pool one by one
|
||||
pool, key := setupTxPool()
|
||||
account, _ := transaction(0, big.NewInt(0), key).From()
|
||||
@ -527,11 +657,56 @@ func benchmarkCheckQueue(b *testing.B, size int) {
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
tx := transaction(uint64(1+i), big.NewInt(100000), key)
|
||||
pool.queueTx(tx.Hash(), tx)
|
||||
pool.enqueueTx(tx.Hash(), tx)
|
||||
}
|
||||
// Benchmark the speed of pool validation
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
pool.checkQueue()
|
||||
pool.promoteExecutables()
|
||||
}
|
||||
}
|
||||
|
||||
// Benchmarks the speed of iterative transaction insertion.
|
||||
func BenchmarkPoolInsert(b *testing.B) {
|
||||
// Generate a batch of transactions to enqueue into the pool
|
||||
pool, key := setupTxPool()
|
||||
account, _ := transaction(0, big.NewInt(0), key).From()
|
||||
state, _ := pool.currentState()
|
||||
state.AddBalance(account, big.NewInt(1000000))
|
||||
|
||||
txs := make(types.Transactions, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
txs[i] = transaction(uint64(i), big.NewInt(100000), key)
|
||||
}
|
||||
// Benchmark importing the transactions into the queue
|
||||
b.ResetTimer()
|
||||
for _, tx := range txs {
|
||||
pool.Add(tx)
|
||||
}
|
||||
}
|
||||
|
||||
// Benchmarks the speed of batched transaction insertion.
|
||||
func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) }
|
||||
func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) }
|
||||
func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000) }
|
||||
|
||||
func benchmarkPoolBatchInsert(b *testing.B, size int) {
|
||||
// Generate a batch of transactions to enqueue into the pool
|
||||
pool, key := setupTxPool()
|
||||
account, _ := transaction(0, big.NewInt(0), key).From()
|
||||
state, _ := pool.currentState()
|
||||
state.AddBalance(account, big.NewInt(1000000))
|
||||
|
||||
batches := make([]types.Transactions, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
batches[i] = make(types.Transactions, size)
|
||||
for j := 0; j < size; j++ {
|
||||
batches[i][j] = transaction(uint64(size*i+j), big.NewInt(100000), key)
|
||||
}
|
||||
}
|
||||
// Benchmark importing the transactions into the queue
|
||||
b.ResetTimer()
|
||||
for _, batch := range batches {
|
||||
pool.AddBatch(batch)
|
||||
}
|
||||
}
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math/big"
|
||||
"sort"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
@ -427,49 +426,58 @@ func (s *TxByPrice) Pop() interface{} {
|
||||
return x
|
||||
}
|
||||
|
||||
// SortByPriceAndNonce sorts the transactions by price in such a way that the
|
||||
// nonce orderings within a single account are maintained.
|
||||
// TransactionsByPriceAndNonce represents a set of transactions that can return
|
||||
// transactions in a profit-maximising sorted order, while supporting removing
|
||||
// entire batches of transactions for non-executable accounts.
|
||||
type TransactionsByPriceAndNonce struct {
|
||||
txs map[common.Address]Transactions // Per account nonce-sorted list of transactions
|
||||
heads TxByPrice // Next transaction for each unique account (price heap)
|
||||
}
|
||||
|
||||
// NewTransactionsByPriceAndNonce creates a transaction set that can retrieve
|
||||
// price sorted transactions in a nonce-honouring way.
|
||||
//
|
||||
// Note, this is not as trivial as it seems from the first look as there are three
|
||||
// different criteria that need to be taken into account (price, nonce, account
|
||||
// match), which cannot be done with any plain sorting method, as certain items
|
||||
// cannot be compared without context.
|
||||
//
|
||||
// This method first sorts the separates the list of transactions into individual
|
||||
// sender accounts and sorts them by nonce. After the account nonce ordering is
|
||||
// satisfied, the results are merged back together by price, always comparing only
|
||||
// the head transaction from each account. This is done via a heap to keep it fast.
|
||||
func SortByPriceAndNonce(txs []*Transaction) {
|
||||
// Separate the transactions by account and sort by nonce
|
||||
byNonce := make(map[common.Address][]*Transaction)
|
||||
for _, tx := range txs {
|
||||
acc, _ := tx.From() // we only sort valid txs so this cannot fail
|
||||
byNonce[acc] = append(byNonce[acc], tx)
|
||||
}
|
||||
for _, accTxs := range byNonce {
|
||||
sort.Sort(TxByNonce(accTxs))
|
||||
}
|
||||
// Note, the input map is reowned so the caller should not interact any more with
|
||||
// if after providng it to the constructor.
|
||||
func NewTransactionsByPriceAndNonce(txs map[common.Address]Transactions) *TransactionsByPriceAndNonce {
|
||||
// Initialize a price based heap with the head transactions
|
||||
byPrice := make(TxByPrice, 0, len(byNonce))
|
||||
for acc, accTxs := range byNonce {
|
||||
byPrice = append(byPrice, accTxs[0])
|
||||
byNonce[acc] = accTxs[1:]
|
||||
heads := make(TxByPrice, 0, len(txs))
|
||||
for acc, accTxs := range txs {
|
||||
heads = append(heads, accTxs[0])
|
||||
txs[acc] = accTxs[1:]
|
||||
}
|
||||
heap.Init(&byPrice)
|
||||
heap.Init(&heads)
|
||||
|
||||
// Merge by replacing the best with the next from the same account
|
||||
txs = txs[:0]
|
||||
for len(byPrice) > 0 {
|
||||
// Retrieve the next best transaction by price
|
||||
best := heap.Pop(&byPrice).(*Transaction)
|
||||
|
||||
// Push in its place the next transaction from the same account
|
||||
acc, _ := best.From() // we only sort valid txs so this cannot fail
|
||||
if accTxs, ok := byNonce[acc]; ok && len(accTxs) > 0 {
|
||||
heap.Push(&byPrice, accTxs[0])
|
||||
byNonce[acc] = accTxs[1:]
|
||||
}
|
||||
// Accumulate the best priced transaction
|
||||
txs = append(txs, best)
|
||||
// Assemble and return the transaction set
|
||||
return &TransactionsByPriceAndNonce{
|
||||
txs: txs,
|
||||
heads: heads,
|
||||
}
|
||||
}
|
||||
|
||||
// Peek returns the next transaction by price.
|
||||
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
|
||||
if len(t.heads) == 0 {
|
||||
return nil
|
||||
}
|
||||
return t.heads[0]
|
||||
}
|
||||
|
||||
// Shift replaces the current best head with the next one from the same account.
|
||||
func (t *TransactionsByPriceAndNonce) Shift() {
|
||||
acc, _ := t.heads[0].From() // we only sort valid txs so this cannot fail
|
||||
|
||||
if txs, ok := t.txs[acc]; ok && len(txs) > 0 {
|
||||
t.heads[0], t.txs[acc] = txs[0], txs[1:]
|
||||
heap.Fix(&t.heads, 0)
|
||||
} else {
|
||||
heap.Pop(&t.heads)
|
||||
}
|
||||
}
|
||||
|
||||
// Pop removes the best transaction, *not* replacing it with the next one from
|
||||
// the same account. This should be used when a transaction cannot be executed
|
||||
// and hence all subsequent ones should be discarded from the same account.
|
||||
func (t *TransactionsByPriceAndNonce) Pop() {
|
||||
heap.Pop(&t.heads)
|
||||
}
|
||||
|
@ -128,15 +128,25 @@ func TestTransactionPriceNonceSort(t *testing.T) {
|
||||
keys[i], _ = crypto.GenerateKey()
|
||||
}
|
||||
// Generate a batch of transactions with overlapping values, but shifted nonces
|
||||
txs := []*Transaction{}
|
||||
groups := map[common.Address]Transactions{}
|
||||
for start, key := range keys {
|
||||
addr := crypto.PubkeyToAddress(key.PublicKey)
|
||||
for i := 0; i < 25; i++ {
|
||||
tx, _ := NewTransaction(uint64(start+i), common.Address{}, big.NewInt(100), big.NewInt(100), big.NewInt(int64(start+i)), nil).SignECDSA(key)
|
||||
txs = append(txs, tx)
|
||||
groups[addr] = append(groups[addr], tx)
|
||||
}
|
||||
}
|
||||
// Sort the transactions and cross check the nonce ordering
|
||||
SortByPriceAndNonce(txs)
|
||||
txset := NewTransactionsByPriceAndNonce(groups)
|
||||
|
||||
txs := Transactions{}
|
||||
for {
|
||||
if tx := txset.Peek(); tx != nil {
|
||||
txs = append(txs, tx)
|
||||
txset.Shift()
|
||||
}
|
||||
break
|
||||
}
|
||||
for i, txi := range txs {
|
||||
fromi, _ := txi.From()
|
||||
|
||||
|
@ -118,21 +118,25 @@ func (b *EthApiBackend) RemoveTx(txHash common.Hash) {
|
||||
b.eth.txMu.Lock()
|
||||
defer b.eth.txMu.Unlock()
|
||||
|
||||
b.eth.txPool.RemoveTx(txHash)
|
||||
b.eth.txPool.Remove(txHash)
|
||||
}
|
||||
|
||||
func (b *EthApiBackend) GetPoolTransactions() types.Transactions {
|
||||
b.eth.txMu.Lock()
|
||||
defer b.eth.txMu.Unlock()
|
||||
|
||||
return b.eth.txPool.GetTransactions()
|
||||
var txs types.Transactions
|
||||
for _, batch := range b.eth.txPool.Pending() {
|
||||
txs = append(txs, batch...)
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
func (b *EthApiBackend) GetPoolTransaction(txHash common.Hash) *types.Transaction {
|
||||
func (b *EthApiBackend) GetPoolTransaction(hash common.Hash) *types.Transaction {
|
||||
b.eth.txMu.Lock()
|
||||
defer b.eth.txMu.Unlock()
|
||||
|
||||
return b.eth.txPool.GetTransaction(txHash)
|
||||
return b.eth.txPool.Get(hash)
|
||||
}
|
||||
|
||||
func (b *EthApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
|
||||
@ -149,7 +153,7 @@ func (b *EthApiBackend) Stats() (pending int, queued int) {
|
||||
return b.eth.txPool.Stats()
|
||||
}
|
||||
|
||||
func (b *EthApiBackend) TxPoolContent() (map[common.Address]map[uint64][]*types.Transaction, map[common.Address]map[uint64][]*types.Transaction) {
|
||||
func (b *EthApiBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
|
||||
b.eth.txMu.Lock()
|
||||
defer b.eth.txMu.Unlock()
|
||||
|
||||
|
@ -677,7 +677,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
}
|
||||
p.MarkTransaction(tx.Hash())
|
||||
}
|
||||
pm.txpool.AddTransactions(txs)
|
||||
pm.txpool.AddBatch(txs)
|
||||
|
||||
default:
|
||||
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/rand"
|
||||
"math/big"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
@ -89,9 +90,9 @@ type testTxPool struct {
|
||||
lock sync.RWMutex // Protects the transaction pool
|
||||
}
|
||||
|
||||
// AddTransactions appends a batch of transactions to the pool, and notifies any
|
||||
// AddBatch appends a batch of transactions to the pool, and notifies any
|
||||
// listeners if the addition channel is non nil
|
||||
func (p *testTxPool) AddTransactions(txs []*types.Transaction) {
|
||||
func (p *testTxPool) AddBatch(txs []*types.Transaction) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
@ -101,15 +102,20 @@ func (p *testTxPool) AddTransactions(txs []*types.Transaction) {
|
||||
}
|
||||
}
|
||||
|
||||
// GetTransactions returns all the transactions known to the pool
|
||||
func (p *testTxPool) GetTransactions() types.Transactions {
|
||||
// Pending returns all the transactions known to the pool
|
||||
func (p *testTxPool) Pending() map[common.Address]types.Transactions {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
|
||||
txs := make([]*types.Transaction, len(p.pool))
|
||||
copy(txs, p.pool)
|
||||
|
||||
return txs
|
||||
batches := make(map[common.Address]types.Transactions)
|
||||
for _, tx := range p.pool {
|
||||
from, _ := tx.From()
|
||||
batches[from] = append(batches[from], tx)
|
||||
}
|
||||
for _, batch := range batches {
|
||||
sort.Sort(types.TxByNonce(batch))
|
||||
}
|
||||
return batches
|
||||
}
|
||||
|
||||
// newTestTransaction create a new dummy transaction.
|
||||
|
@ -97,12 +97,12 @@ var errorToString = map[int]string{
|
||||
}
|
||||
|
||||
type txPool interface {
|
||||
// AddTransactions should add the given transactions to the pool.
|
||||
AddTransactions([]*types.Transaction)
|
||||
// AddBatch should add the given transactions to the pool.
|
||||
AddBatch([]*types.Transaction)
|
||||
|
||||
// GetTransactions should return pending transactions.
|
||||
// Pending should return pending transactions.
|
||||
// The slice should be modifiable by the caller.
|
||||
GetTransactions() types.Transactions
|
||||
Pending() map[common.Address]types.Transactions
|
||||
}
|
||||
|
||||
// statusData is the network packet for the status message.
|
||||
|
@ -130,7 +130,7 @@ func testSendTransactions(t *testing.T, protocol int) {
|
||||
for nonce := range alltxs {
|
||||
alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), txsize)
|
||||
}
|
||||
pm.txpool.AddTransactions(alltxs)
|
||||
pm.txpool.AddBatch(alltxs)
|
||||
|
||||
// Connect several peers. They should all receive the pending transactions.
|
||||
var wg sync.WaitGroup
|
||||
|
@ -45,7 +45,10 @@ type txsync struct {
|
||||
|
||||
// syncTransactions starts sending all currently pending transactions to the given peer.
|
||||
func (pm *ProtocolManager) syncTransactions(p *peer) {
|
||||
txs := pm.txpool.GetTransactions()
|
||||
var txs types.Transactions
|
||||
for _, batch := range pm.txpool.Pending() {
|
||||
txs = append(txs, batch...)
|
||||
}
|
||||
if len(txs) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -100,32 +100,26 @@ func NewPublicTxPoolAPI(b Backend) *PublicTxPoolAPI {
|
||||
}
|
||||
|
||||
// Content returns the transactions contained within the transaction pool.
|
||||
func (s *PublicTxPoolAPI) Content() map[string]map[string]map[string][]*RPCTransaction {
|
||||
content := map[string]map[string]map[string][]*RPCTransaction{
|
||||
"pending": make(map[string]map[string][]*RPCTransaction),
|
||||
"queued": make(map[string]map[string][]*RPCTransaction),
|
||||
func (s *PublicTxPoolAPI) Content() map[string]map[string]map[string]*RPCTransaction {
|
||||
content := map[string]map[string]map[string]*RPCTransaction{
|
||||
"pending": make(map[string]map[string]*RPCTransaction),
|
||||
"queued": make(map[string]map[string]*RPCTransaction),
|
||||
}
|
||||
pending, queue := s.b.TxPoolContent()
|
||||
|
||||
// Flatten the pending transactions
|
||||
for account, batches := range pending {
|
||||
dump := make(map[string][]*RPCTransaction)
|
||||
for nonce, txs := range batches {
|
||||
nonce := fmt.Sprintf("%d", nonce)
|
||||
for _, tx := range txs {
|
||||
dump[nonce] = append(dump[nonce], newRPCPendingTransaction(tx))
|
||||
}
|
||||
for account, txs := range pending {
|
||||
dump := make(map[string]*RPCTransaction)
|
||||
for nonce, tx := range txs {
|
||||
dump[fmt.Sprintf("%d", nonce)] = newRPCPendingTransaction(tx)
|
||||
}
|
||||
content["pending"][account.Hex()] = dump
|
||||
}
|
||||
// Flatten the queued transactions
|
||||
for account, batches := range queue {
|
||||
dump := make(map[string][]*RPCTransaction)
|
||||
for nonce, txs := range batches {
|
||||
nonce := fmt.Sprintf("%d", nonce)
|
||||
for _, tx := range txs {
|
||||
dump[nonce] = append(dump[nonce], newRPCPendingTransaction(tx))
|
||||
}
|
||||
for account, txs := range queue {
|
||||
dump := make(map[string]*RPCTransaction)
|
||||
for nonce, tx := range txs {
|
||||
dump[fmt.Sprintf("%d", nonce)] = newRPCPendingTransaction(tx)
|
||||
}
|
||||
content["queued"][account.Hex()] = dump
|
||||
}
|
||||
@ -143,10 +137,10 @@ func (s *PublicTxPoolAPI) Status() map[string]*rpc.HexNumber {
|
||||
|
||||
// Inspect retrieves the content of the transaction pool and flattens it into an
|
||||
// easily inspectable list.
|
||||
func (s *PublicTxPoolAPI) Inspect() map[string]map[string]map[string][]string {
|
||||
content := map[string]map[string]map[string][]string{
|
||||
"pending": make(map[string]map[string][]string),
|
||||
"queued": make(map[string]map[string][]string),
|
||||
func (s *PublicTxPoolAPI) Inspect() map[string]map[string]map[string]string {
|
||||
content := map[string]map[string]map[string]string{
|
||||
"pending": make(map[string]map[string]string),
|
||||
"queued": make(map[string]map[string]string),
|
||||
}
|
||||
pending, queue := s.b.TxPoolContent()
|
||||
|
||||
@ -158,24 +152,18 @@ func (s *PublicTxPoolAPI) Inspect() map[string]map[string]map[string][]string {
|
||||
return fmt.Sprintf("contract creation: %v wei + %v × %v gas", tx.Value(), tx.Gas(), tx.GasPrice())
|
||||
}
|
||||
// Flatten the pending transactions
|
||||
for account, batches := range pending {
|
||||
dump := make(map[string][]string)
|
||||
for nonce, txs := range batches {
|
||||
nonce := fmt.Sprintf("%d", nonce)
|
||||
for _, tx := range txs {
|
||||
dump[nonce] = append(dump[nonce], format(tx))
|
||||
}
|
||||
for account, txs := range pending {
|
||||
dump := make(map[string]string)
|
||||
for nonce, tx := range txs {
|
||||
dump[fmt.Sprintf("%d", nonce)] = format(tx)
|
||||
}
|
||||
content["pending"][account.Hex()] = dump
|
||||
}
|
||||
// Flatten the queued transactions
|
||||
for account, batches := range queue {
|
||||
dump := make(map[string][]string)
|
||||
for nonce, txs := range batches {
|
||||
nonce := fmt.Sprintf("%d", nonce)
|
||||
for _, tx := range txs {
|
||||
dump[nonce] = append(dump[nonce], format(tx))
|
||||
}
|
||||
for account, txs := range queue {
|
||||
dump := make(map[string]string)
|
||||
for nonce, tx := range txs {
|
||||
dump[fmt.Sprintf("%d", nonce)] = format(tx)
|
||||
}
|
||||
content["queued"][account.Hex()] = dump
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ type Backend interface {
|
||||
GetPoolTransaction(txHash common.Hash) *types.Transaction
|
||||
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
|
||||
Stats() (pending int, queued int)
|
||||
TxPoolContent() (map[common.Address]map[uint64][]*types.Transaction, map[common.Address]map[uint64][]*types.Transaction)
|
||||
TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions)
|
||||
}
|
||||
|
||||
type State interface {
|
||||
|
136
miner/worker.go
136
miner/worker.go
@ -63,18 +63,16 @@ type uint64RingBuffer struct {
|
||||
// Work is the workers current environment and holds
|
||||
// all of the current state information
|
||||
type Work struct {
|
||||
config *core.ChainConfig
|
||||
state *state.StateDB // apply state changes here
|
||||
ancestors *set.Set // ancestor set (used for checking uncle parent validity)
|
||||
family *set.Set // family set (used for checking uncle invalidity)
|
||||
uncles *set.Set // uncle set
|
||||
remove *set.Set // tx which will be removed
|
||||
tcount int // tx count in cycle
|
||||
ignoredTransactors *set.Set
|
||||
lowGasTransactors *set.Set
|
||||
ownedAccounts *set.Set
|
||||
lowGasTxs types.Transactions
|
||||
localMinedBlocks *uint64RingBuffer // the most recent block numbers that were mined locally (used to check block inclusion)
|
||||
config *core.ChainConfig
|
||||
state *state.StateDB // apply state changes here
|
||||
ancestors *set.Set // ancestor set (used for checking uncle parent validity)
|
||||
family *set.Set // family set (used for checking uncle invalidity)
|
||||
uncles *set.Set // uncle set
|
||||
tcount int // tx count in cycle
|
||||
ownedAccounts *set.Set
|
||||
lowGasTxs types.Transactions
|
||||
failedTxs types.Transactions
|
||||
localMinedBlocks *uint64RingBuffer // the most recent block numbers that were mined locally (used to check block inclusion)
|
||||
|
||||
Block *types.Block // the new block
|
||||
|
||||
@ -236,7 +234,12 @@ func (self *worker) update() {
|
||||
// Apply transaction to the pending state if we're not mining
|
||||
if atomic.LoadInt32(&self.mining) == 0 {
|
||||
self.currentMu.Lock()
|
||||
self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain)
|
||||
|
||||
acc, _ := ev.Tx.From()
|
||||
txs := map[common.Address]types.Transactions{acc: types.Transactions{ev.Tx}}
|
||||
txset := types.NewTransactionsByPriceAndNonce(txs)
|
||||
|
||||
self.current.commitTransactions(self.mux, txset, self.gasPrice, self.chain)
|
||||
self.currentMu.Unlock()
|
||||
}
|
||||
}
|
||||
@ -383,10 +386,7 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error
|
||||
accounts := self.eth.AccountManager().Accounts()
|
||||
|
||||
// Keep track of transactions which return errors so they can be removed
|
||||
work.remove = set.New()
|
||||
work.tcount = 0
|
||||
work.ignoredTransactors = set.New()
|
||||
work.lowGasTransactors = set.New()
|
||||
work.ownedAccounts = accountAddressesSet(accounts)
|
||||
if self.current != nil {
|
||||
work.localMinedBlocks = self.current.localMinedBlocks
|
||||
@ -495,45 +495,11 @@ func (self *worker) commitNewWork() {
|
||||
if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
|
||||
core.ApplyDAOHardFork(work.state)
|
||||
}
|
||||
txs := types.NewTransactionsByPriceAndNonce(self.eth.TxPool().Pending())
|
||||
work.commitTransactions(self.mux, txs, self.gasPrice, self.chain)
|
||||
|
||||
/* //approach 1
|
||||
transactions := self.eth.TxPool().GetTransactions()
|
||||
sort.Sort(types.TxByNonce(transactions))
|
||||
*/
|
||||
|
||||
//approach 2
|
||||
transactions := self.eth.TxPool().GetTransactions()
|
||||
types.SortByPriceAndNonce(transactions)
|
||||
|
||||
/* // approach 3
|
||||
// commit transactions for this run.
|
||||
txPerOwner := make(map[common.Address]types.Transactions)
|
||||
// Sort transactions by owner
|
||||
for _, tx := range self.eth.TxPool().GetTransactions() {
|
||||
from, _ := tx.From() // we can ignore the sender error
|
||||
txPerOwner[from] = append(txPerOwner[from], tx)
|
||||
}
|
||||
var (
|
||||
singleTxOwner types.Transactions
|
||||
multiTxOwner types.Transactions
|
||||
)
|
||||
// Categorise transactions by
|
||||
// 1. 1 owner tx per block
|
||||
// 2. multi txs owner per block
|
||||
for _, txs := range txPerOwner {
|
||||
if len(txs) == 1 {
|
||||
singleTxOwner = append(singleTxOwner, txs[0])
|
||||
} else {
|
||||
multiTxOwner = append(multiTxOwner, txs...)
|
||||
}
|
||||
}
|
||||
sort.Sort(types.TxByPrice(singleTxOwner))
|
||||
sort.Sort(types.TxByNonce(multiTxOwner))
|
||||
transactions := append(singleTxOwner, multiTxOwner...)
|
||||
*/
|
||||
|
||||
work.commitTransactions(self.mux, transactions, self.gasPrice, self.chain)
|
||||
self.eth.TxPool().RemoveTransactions(work.lowGasTxs)
|
||||
self.eth.TxPool().RemoveBatch(work.lowGasTxs)
|
||||
self.eth.TxPool().RemoveBatch(work.failedTxs)
|
||||
|
||||
// compute uncles for the new block.
|
||||
var (
|
||||
@ -591,65 +557,51 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) {
|
||||
func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, gasPrice *big.Int, bc *core.BlockChain) {
|
||||
gp := new(core.GasPool).AddGas(env.header.GasLimit)
|
||||
|
||||
var coalescedLogs vm.Logs
|
||||
for _, tx := range transactions {
|
||||
for {
|
||||
// Retrieve the next transaction and abort if all done
|
||||
tx := txs.Peek()
|
||||
if tx == nil {
|
||||
break
|
||||
}
|
||||
// Error may be ignored here. The error has already been checked
|
||||
// during transaction acceptance is the transaction pool.
|
||||
from, _ := tx.From()
|
||||
|
||||
// Check if it falls within margin. Txs from owned accounts are always processed.
|
||||
// Ignore any transactions (and accounts subsequently) with low gas limits
|
||||
if tx.GasPrice().Cmp(gasPrice) < 0 && !env.ownedAccounts.Has(from) {
|
||||
// ignore the transaction and transactor. We ignore the transactor
|
||||
// because nonce will fail after ignoring this transaction so there's
|
||||
// no point
|
||||
env.lowGasTransactors.Add(from)
|
||||
// Pop the current low-priced transaction without shifting in the next from the account
|
||||
glog.V(logger.Info).Infof("Transaction (%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(gasPrice), from[:4])
|
||||
|
||||
glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(gasPrice), from[:4])
|
||||
}
|
||||
env.lowGasTxs = append(env.lowGasTxs, tx)
|
||||
txs.Pop()
|
||||
|
||||
// Continue with the next transaction if the transaction sender is included in
|
||||
// the low gas tx set. This will also remove the tx and all sequential transaction
|
||||
// from this transactor
|
||||
if env.lowGasTransactors.Has(from) {
|
||||
// add tx to the low gas set. This will be removed at the end of the run
|
||||
// owned accounts are ignored
|
||||
if !env.ownedAccounts.Has(from) {
|
||||
env.lowGasTxs = append(env.lowGasTxs, tx)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Move on to the next transaction when the transactor is in ignored transactions set
|
||||
// This may occur when a transaction hits the gas limit. When a gas limit is hit and
|
||||
// the transaction is processed (that could potentially be included in the block) it
|
||||
// will throw a nonce error because the previous transaction hasn't been processed.
|
||||
// Therefor we need to ignore any transaction after the ignored one.
|
||||
if env.ignoredTransactors.Has(from) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Start executing the transaction
|
||||
env.state.StartRecord(tx.Hash(), common.Hash{}, 0)
|
||||
|
||||
err, logs := env.commitTransaction(tx, bc, gp)
|
||||
switch {
|
||||
case core.IsGasLimitErr(err):
|
||||
// ignore the transactor so no nonce errors will be thrown for this account
|
||||
// next time the worker is run, they'll be picked up again.
|
||||
env.ignoredTransactors.Add(from)
|
||||
|
||||
// Pop the current out-of-gas transaction without shifting in the next from the account
|
||||
glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
|
||||
case err != nil:
|
||||
env.remove.Add(tx.Hash())
|
||||
txs.Pop()
|
||||
|
||||
case err != nil:
|
||||
// Pop the current failed transaction without shifting in the next from the account
|
||||
glog.V(logger.Detail).Infof("Transaction (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
|
||||
env.failedTxs = append(env.failedTxs, tx)
|
||||
txs.Pop()
|
||||
|
||||
if glog.V(logger.Detail) {
|
||||
glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
|
||||
}
|
||||
default:
|
||||
env.tcount++
|
||||
// Everything ok, collect the logs and shift in the next transaction from the same account
|
||||
coalescedLogs = append(coalescedLogs, logs...)
|
||||
env.tcount++
|
||||
txs.Shift()
|
||||
}
|
||||
}
|
||||
if len(coalescedLogs) > 0 || env.tcount > 0 {
|
||||
|
Loading…
Reference in New Issue
Block a user