From 3d32690b54539d13ec8e7884bf3416ada6046354 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 28 Jul 2017 15:09:39 +0200 Subject: [PATCH] cmd, core, eth: journal local transactions to disk (#14784) * core: reduce txpool event loop goroutines and sync structs * cmd, core, eth: journal local transactions to disk * core: journal replacement pending transactions too * core: separate transaction journal from pool --- cmd/geth/main.go | 2 + cmd/geth/usage.go | 2 + cmd/utils/flags.go | 16 ++++ core/tx_journal.go | 150 +++++++++++++++++++++++++++++++++++++ core/tx_pool.go | 152 +++++++++++++++++++++++++------------- core/tx_pool_test.go | 172 +++++++++++++++++++++++++++++++++++-------- eth/backend.go | 6 +- 7 files changed, 416 insertions(+), 84 deletions(-) create mode 100644 core/tx_journal.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 607414bbb..e89f88ec9 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -67,6 +67,8 @@ var ( utils.EthashDatasetsInMemoryFlag, utils.EthashDatasetsOnDiskFlag, utils.TxPoolNoLocalsFlag, + utils.TxPoolJournalFlag, + utils.TxPoolRejournalFlag, utils.TxPoolPriceLimitFlag, utils.TxPoolPriceBumpFlag, utils.TxPoolAccountSlotsFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 275aad674..80861d852 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -96,6 +96,8 @@ var AppHelpFlagGroups = []flagGroup{ Name: "TRANSACTION POOL", Flags: []cli.Flag{ utils.TxPoolNoLocalsFlag, + utils.TxPoolJournalFlag, + utils.TxPoolRejournalFlag, utils.TxPoolPriceLimitFlag, utils.TxPoolPriceBumpFlag, utils.TxPoolAccountSlotsFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 0159364af..9f7b76c12 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -213,6 +213,16 @@ var ( Name: "txpool.nolocals", Usage: "Disables price exemptions for locally submitted transactions", } + TxPoolJournalFlag = cli.StringFlag{ + Name: "txpool.journal", + Usage: "Disk journal for local transaction to survive node restarts", + Value: core.DefaultTxPoolConfig.Journal, + } + TxPoolRejournalFlag = cli.DurationFlag{ + Name: "txpool.rejournal", + Usage: "Time interval to regenerate the local transaction journal", + Value: core.DefaultTxPoolConfig.Rejournal, + } TxPoolPriceLimitFlag = cli.Uint64Flag{ Name: "txpool.pricelimit", Usage: "Minimum gas price limit to enforce for acceptance into the pool", @@ -838,6 +848,12 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { if ctx.GlobalIsSet(TxPoolNoLocalsFlag.Name) { cfg.NoLocals = ctx.GlobalBool(TxPoolNoLocalsFlag.Name) } + if ctx.GlobalIsSet(TxPoolJournalFlag.Name) { + cfg.Journal = ctx.GlobalString(TxPoolJournalFlag.Name) + } + if ctx.GlobalIsSet(TxPoolRejournalFlag.Name) { + cfg.Rejournal = ctx.GlobalDuration(TxPoolRejournalFlag.Name) + } if ctx.GlobalIsSet(TxPoolPriceLimitFlag.Name) { cfg.PriceLimit = ctx.GlobalUint64(TxPoolPriceLimitFlag.Name) } diff --git a/core/tx_journal.go b/core/tx_journal.go new file mode 100644 index 000000000..94a9ff9b8 --- /dev/null +++ b/core/tx_journal.go @@ -0,0 +1,150 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package core + +import ( + "errors" + "io" + "os" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +// errNoActiveJournal is returned if a transaction is attempted to be inserted +// into the journal, but no such file is currently open. +var errNoActiveJournal = errors.New("no active journal") + +// txJournal is a rotating log of transactions with the aim of storing locally +// created transactions to allow non-executed ones to survive node restarts. +type txJournal struct { + path string // Filesystem path to store the transactions at + writer io.WriteCloser // Output stream to write new transactions into +} + +// newTxJournal creates a new transaction journal to +func newTxJournal(path string) *txJournal { + return &txJournal{ + path: path, + } +} + +// load parses a transaction journal dump from disk, loading its contents into +// the specified pool. +func (journal *txJournal) load(add func(*types.Transaction) error) error { + // Skip the parsing if the journal file doens't exist at all + if _, err := os.Stat(journal.path); os.IsNotExist(err) { + return nil + } + // Open the journal for loading any past transactions + input, err := os.Open(journal.path) + if err != nil { + return err + } + defer input.Close() + + // Inject all transactions from the journal into the pool + stream := rlp.NewStream(input, 0) + total, dropped := 0, 0 + + var failure error + for { + // Parse the next transaction and terminate on error + tx := new(types.Transaction) + if err = stream.Decode(tx); err != nil { + if err != io.EOF { + failure = err + } + break + } + // Import the transaction and bump the appropriate progress counters + total++ + if err = add(tx); err != nil { + log.Debug("Failed to add journaled transaction", "err", err) + dropped++ + continue + } + } + log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped) + + return failure +} + +// insert adds the specified transaction to the local disk journal. +func (journal *txJournal) insert(tx *types.Transaction) error { + if journal.writer == nil { + return errNoActiveJournal + } + if err := rlp.Encode(journal.writer, tx); err != nil { + return err + } + return nil +} + +// rotate regenerates the transaction journal based on the current contents of +// the transaction pool. +func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error { + // Close the current journal (if any is open) + if journal.writer != nil { + if err := journal.writer.Close(); err != nil { + return err + } + journal.writer = nil + } + // Generate a new journal with the contents of the current pool + replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755) + if err != nil { + return err + } + journaled := 0 + for _, txs := range all { + for _, tx := range txs { + if err = rlp.Encode(replacement, tx); err != nil { + replacement.Close() + return err + } + } + journaled += len(txs) + } + replacement.Close() + + // Replace the live journal with the newly generated one + if err = os.Rename(journal.path+".new", journal.path); err != nil { + return err + } + sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755) + if err != nil { + return err + } + journal.writer = sink + log.Info("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all)) + + return nil +} + +// close flushes the transaction journal contents to disk and closes the file. +func (journal *txJournal) close() error { + var err error + + if journal.writer != nil { + err = journal.writer.Close() + journal.writer = nil + } + return err +} diff --git a/core/tx_pool.go b/core/tx_pool.go index 8e2d1b31d..16f774265 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -99,7 +99,9 @@ type stateFn func() (*state.StateDB, error) // TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { - NoLocals bool // Whether local transaction handling should be disabled + NoLocals bool // Whether local transaction handling should be disabled + Journal string // Journal of local transactions to survive node restarts + Rejournal time.Duration // Time interval to regenerate the local transaction journal PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) @@ -115,6 +117,9 @@ type TxPoolConfig struct { // DefaultTxPoolConfig contains the default configurations for the transaction // pool. var DefaultTxPoolConfig = TxPoolConfig{ + Journal: "transactions.rlp", + Rejournal: time.Hour, + PriceLimit: 1, PriceBump: 10, @@ -130,6 +135,10 @@ var DefaultTxPoolConfig = TxPoolConfig{ // unreasonable or unworkable. func (config *TxPoolConfig) sanitize() TxPoolConfig { conf := *config + if conf.Rejournal < time.Second { + log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second) + conf.Rejournal = time.Second + } if conf.PriceLimit < 1 { log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit) conf.PriceLimit = DefaultTxPoolConfig.PriceLimit @@ -157,18 +166,19 @@ type TxPool struct { gasPrice *big.Int eventMux *event.TypeMux events *event.TypeMuxSubscription - locals *accountSet signer types.Signer mu sync.RWMutex + locals *accountSet // Set of local transaction to exepmt from evicion rules + journal *txJournal // Journal of local transaction to back up to disk + pending map[common.Address]*txList // All currently processable transactions queue map[common.Address]*txList // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account all map[common.Hash]*types.Transaction // All transactions to allow lookups priced *txPricedList // All transactions sorted by price - wg sync.WaitGroup // for shutdown sync - quit chan struct{} + wg sync.WaitGroup // for shutdown sync homestead bool } @@ -194,32 +204,48 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e gasPrice: new(big.Int).SetUint64(config.PriceLimit), pendingState: nil, events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), - quit: make(chan struct{}), } pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(&pool.all) pool.resetState() - // Start the various events loops and return - pool.wg.Add(2) - go pool.eventLoop() - go pool.expirationLoop() + // If local transactions and journaling is enabled, load from disk + if !config.NoLocals && config.Journal != "" { + pool.journal = newTxJournal(config.Journal) + + if err := pool.journal.load(pool.AddLocal); err != nil { + log.Warn("Failed to load transaction journal", "err", err) + } + if err := pool.journal.rotate(pool.local()); err != nil { + log.Warn("Failed to rotate transaction journal", "err", err) + } + } + // Start the event loop and return + pool.wg.Add(1) + go pool.loop() return pool } -func (pool *TxPool) eventLoop() { +// loop is the transaction pool's main event loop, waiting for and reacting to +// outside blockchain events as well as for various reporting and transaction +// eviction events. +func (pool *TxPool) loop() { defer pool.wg.Done() - // Start a ticker and keep track of interesting pool stats to report + // Start the stats reporting and transaction eviction tickers var prevPending, prevQueued, prevStales int report := time.NewTicker(statsReportInterval) defer report.Stop() - // Track chain events. When a chain events occurs (new chain canon block) - // we need to know the new state. The new state will help us determine - // the nonces in the managed state + evict := time.NewTicker(evictionInterval) + defer evict.Stop() + + journal := time.NewTicker(pool.config.Rejournal) + defer journal.Stop() + + // Keep waiting for and reacting to the various events for { select { // Handle any events fired by the system @@ -253,6 +279,31 @@ func (pool *TxPool) eventLoop() { log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) prevPending, prevQueued, prevStales = pending, queued, stales } + + // Handle inactive account transaction eviction + case <-evict.C: + pool.mu.Lock() + for addr := range pool.queue { + // Skip local transactions from the eviction mechanism + if pool.locals.contains(addr) { + continue + } + // Any non-locals old enough should be removed + if time.Since(pool.beats[addr]) > pool.config.Lifetime { + for _, tx := range pool.queue[addr].Flatten() { + pool.removeTx(tx.Hash()) + } + } + } + pool.mu.Unlock() + + // Handle local transaction journal rotation + case <-journal.C: + if pool.journal != nil { + if err := pool.journal.rotate(pool.local()); err != nil { + log.Warn("Failed to rotate local tx journal", "err", err) + } + } } } } @@ -284,9 +335,11 @@ func (pool *TxPool) resetState() { // Stop terminates the transaction pool. func (pool *TxPool) Stop() { pool.events.Unsubscribe() - close(pool.quit) pool.wg.Wait() + if pool.journal != nil { + pool.journal.close() + } log.Info("Transaction pool stopped") } @@ -373,6 +426,22 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { return pending, nil } +// local retrieves all currently known local transactions, groupped by origin +// account and sorted by nonce. The returned transaction set is a copy and can be +// freely modified by calling code. +func (pool *TxPool) local() map[common.Address]types.Transactions { + txs := make(map[common.Address]types.Transactions) + for addr := range pool.locals.accounts { + if pending := pool.pending[addr]; pending != nil { + txs[addr] = append(txs[addr], pending.Flatten()...) + } + if queued := pool.queue[addr]; queued != nil { + txs[addr] = append(txs[addr], queued.Flatten()...) + } + } + return txs +} + // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { @@ -473,18 +542,22 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { } pool.all[tx.Hash()] = tx pool.priced.Put(tx) + pool.journalTx(from, tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) return old != nil, nil } - // New transaction isn't replacing a pending one, push into queue and potentially mark local + // New transaction isn't replacing a pending one, push into queue replace, err := pool.enqueueTx(hash, tx) if err != nil { return false, err } + // Mark local addresses and journal local transactions if local { pool.locals.add(from) } + pool.journalTx(from, tx) + log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replace, nil } @@ -515,6 +588,18 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er return old != nil, nil } +// journalTx adds the specified transaction to the local disk journal if it is +// deemed to have been sent from a local account. +func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) { + // Only journal if it's enabled and the transaction is local + if pool.journal == nil || !pool.locals.contains(from) { + return + } + if err := pool.journal.insert(tx); err != nil { + log.Warn("Failed to journal local transaction", "err", err) + } +} + // promoteTx adds a transaction to the pending (processable) list of transactions. // // Note, this method assumes the pool lock is held! @@ -910,39 +995,6 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { } } -// expirationLoop is a loop that periodically iterates over all accounts with -// queued transactions and drop all that have been inactive for a prolonged amount -// of time. -func (pool *TxPool) expirationLoop() { - defer pool.wg.Done() - - evict := time.NewTicker(evictionInterval) - defer evict.Stop() - - for { - select { - case <-evict.C: - pool.mu.Lock() - for addr := range pool.queue { - // Skip local transactions from the eviction mechanism - if pool.locals.contains(addr) { - continue - } - // Any non-locals old enough should be removed - if time.Since(pool.beats[addr]) > pool.config.Lifetime { - for _, tx := range pool.queue[addr].Flatten() { - pool.removeTx(tx.Hash()) - } - } - } - pool.mu.Unlock() - - case <-pool.quit: - return - } - } -} - // addressByHeartbeat is an account address tagged with its last activity timestamp. type addressByHeartbeat struct { address common.Address @@ -955,7 +1007,7 @@ func (a addresssByHeartbeat) Len() int { return len(a) } func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) } func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// accountSet is simply a set of addresses to check for existance, and a signer +// accountSet is simply a set of addresses to check for existence, and a signer // capable of deriving addresses from transactions. type accountSet struct { accounts map[common.Address]struct{} diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 03ece3886..80bc0b384 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -19,8 +19,10 @@ package core import ( "crypto/ecdsa" "fmt" + "io/ioutil" "math/big" "math/rand" + "os" "testing" "time" @@ -33,6 +35,15 @@ import ( "github.com/ethereum/go-ethereum/params" ) +// testTxPoolConfig is a transaction pool configuration without stateful disk +// sideeffects used during testing. +var testTxPoolConfig TxPoolConfig + +func init() { + testTxPoolConfig = DefaultTxPoolConfig + testTxPoolConfig.Journal = "" +} + func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction { return pricedTransaction(nonce, gaslimit, big.NewInt(1), key) } @@ -47,8 +58,7 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) key, _ := crypto.GenerateKey() - pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) - pool.resetState() + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) return pool, key } @@ -125,9 +135,8 @@ func TestStateChangeDuringPoolReset(t *testing.T) { gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) } - pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, mux, stateFunc, gasLimitFunc) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, mux, stateFunc, gasLimitFunc) defer pool.Stop() - pool.resetState() nonce := pool.State().GetNonce(address) if nonce != 0 { @@ -618,25 +627,25 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { pool.resetState() // Keep queuing up transactions and make sure all above a limit are dropped - for i := uint64(1); i <= DefaultTxPoolConfig.AccountQueue+5; i++ { + for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ { if err := pool.AddRemote(transaction(i, big.NewInt(100000), key)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } if len(pool.pending) != 0 { t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, len(pool.pending), 0) } - if i <= DefaultTxPoolConfig.AccountQueue { + if i <= testTxPoolConfig.AccountQueue { if pool.queue[account].Len() != int(i) { t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), i) } } else { - if pool.queue[account].Len() != int(DefaultTxPoolConfig.AccountQueue) { - t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, pool.queue[account].Len(), DefaultTxPoolConfig.AccountQueue) + if pool.queue[account].Len() != int(testTxPoolConfig.AccountQueue) { + t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, pool.queue[account].Len(), testTxPoolConfig.AccountQueue) } } } - if len(pool.all) != int(DefaultTxPoolConfig.AccountQueue) { - t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), DefaultTxPoolConfig.AccountQueue) + if len(pool.all) != int(testTxPoolConfig.AccountQueue) { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), testTxPoolConfig.AccountQueue) } } @@ -657,13 +666,12 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - config := DefaultTxPoolConfig + config := testTxPoolConfig config.NoLocals = nolocals config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) defer pool.Stop() - pool.resetState() // Create a number of test accounts and fund them (last one will be the local) state, _ := pool.currentState() @@ -748,13 +756,12 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - config := DefaultTxPoolConfig + config := testTxPoolConfig config.Lifetime = 250 * time.Millisecond config.NoLocals = nolocals pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) defer pool.Stop() - pool.resetState() // Create two test accounts to ensure remotes expire but locals do not local, _ := crypto.GenerateKey() @@ -817,7 +824,7 @@ func TestTransactionPendingLimiting(t *testing.T) { pool.resetState() // Keep queuing up transactions and make sure all above a limit are dropped - for i := uint64(0); i < DefaultTxPoolConfig.AccountQueue+5; i++ { + for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { if err := pool.AddRemote(transaction(i, big.NewInt(100000), key)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } @@ -828,8 +835,8 @@ func TestTransactionPendingLimiting(t *testing.T) { t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), 0) } } - if len(pool.all) != int(DefaultTxPoolConfig.AccountQueue+5) { - t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), DefaultTxPoolConfig.AccountQueue+5) + if len(pool.all) != int(testTxPoolConfig.AccountQueue+5) { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), testTxPoolConfig.AccountQueue+5) } } @@ -845,7 +852,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { state1, _ := pool1.currentState() state1.AddBalance(account1, big.NewInt(1000000)) - for i := uint64(0); i < DefaultTxPoolConfig.AccountQueue+5; i++ { + for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { if err := pool1.AddRemote(transaction(origin+i, big.NewInt(100000), key1)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } @@ -857,7 +864,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { state2.AddBalance(account2, big.NewInt(1000000)) txns := []*types.Transaction{} - for i := uint64(0); i < DefaultTxPoolConfig.AccountQueue+5; i++ { + for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { txns = append(txns, transaction(origin+i, big.NewInt(100000), key2)) } pool2.AddRemotes(txns) @@ -888,12 +895,11 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - config := DefaultTxPoolConfig + config := testTxPoolConfig config.GlobalSlots = config.AccountSlots * 10 pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) defer pool.Stop() - pool.resetState() // Create a number of test accounts and fund them state, _ := pool.currentState() @@ -935,14 +941,13 @@ func TestTransactionCapClearsFromAll(t *testing.T) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - config := DefaultTxPoolConfig + config := testTxPoolConfig config.AccountSlots = 2 config.AccountQueue = 2 config.GlobalSlots = 8 pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) defer pool.Stop() - pool.resetState() // Create a number of test accounts and fund them state, _ := pool.currentState() @@ -970,12 +975,11 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - config := DefaultTxPoolConfig + config := testTxPoolConfig config.GlobalSlots = 0 pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) defer pool.Stop() - pool.resetState() // Create a number of test accounts and fund them state, _ := pool.currentState() @@ -1019,9 +1023,8 @@ func TestTransactionPoolRepricing(t *testing.T) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) defer pool.Stop() - pool.resetState() // Create a number of test accounts and fund them state, _ := pool.currentState() @@ -1104,13 +1107,12 @@ func TestTransactionPoolUnderpricing(t *testing.T) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - config := DefaultTxPoolConfig + config := testTxPoolConfig config.GlobalSlots = 2 config.GlobalQueue = 2 pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) defer pool.Stop() - pool.resetState() // Create a number of test accounts and fund them state, _ := pool.currentState() @@ -1192,9 +1194,8 @@ func TestTransactionReplacement(t *testing.T) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) defer pool.Stop() - pool.resetState() // Create a test account to add transactions with key, _ := crypto.GenerateKey() @@ -1204,7 +1205,7 @@ func TestTransactionReplacement(t *testing.T) { // Add pending transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too) price := int64(100) - threshold := (price * (100 + int64(DefaultTxPoolConfig.PriceBump))) / 100 + threshold := (price * (100 + int64(testTxPoolConfig.PriceBump))) / 100 if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), key)); err != nil { t.Fatalf("failed to add original cheap pending transaction: %v", err) @@ -1250,6 +1251,113 @@ func TestTransactionReplacement(t *testing.T) { } } +// Tests that local transactions are journaled to disk, but remote transactions +// get discarded between restarts. +func TestTransactionJournaling(t *testing.T) { testTransactionJournaling(t, false) } +func TestTransactionJournalingNoLocals(t *testing.T) { testTransactionJournaling(t, true) } + +func testTransactionJournaling(t *testing.T, nolocals bool) { + // Create a temporary file for the journal + file, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("failed to create temporary journal: %v", err) + } + journal := file.Name() + defer os.Remove(journal) + + // Clean up the temporary file, we only need the path for now + file.Close() + os.Remove(journal) + + // Create the original pool to inject transaction into the journal + db, _ := ethdb.NewMemDatabase() + statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + + config := testTxPoolConfig + config.NoLocals = nolocals + config.Journal = journal + config.Rejournal = time.Second + + pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + + // Create two test accounts to ensure remotes expire but locals do not + local, _ := crypto.GenerateKey() + remote, _ := crypto.GenerateKey() + + statedb, _ = pool.currentState() + statedb.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) + statedb.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) + + // Add three local and a remote transactions and ensure they are queued up + if err := pool.AddLocal(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), local)); err != nil { + t.Fatalf("failed to add local transaction: %v", err) + } + if err := pool.AddLocal(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), local)); err != nil { + t.Fatalf("failed to add local transaction: %v", err) + } + if err := pool.AddLocal(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), local)); err != nil { + t.Fatalf("failed to add local transaction: %v", err) + } + if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), remote)); err != nil { + t.Fatalf("failed to add remote transaction: %v", err) + } + pending, queued := pool.stats() + if pending != 4 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4) + } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive + pool.Stop() + statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) + pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + + pending, queued = pool.stats() + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) + } + if nolocals { + if pending != 0 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) + } + } else { + if pending != 2 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + } + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Bump the nonce temporarily and ensure the newly invalidated transaction is removed + statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2) + pool.resetState() + time.Sleep(2 * config.Rejournal) + pool.Stop() + statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) + pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + + pending, queued = pool.stats() + if pending != 0 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) + } + if nolocals { + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) + } + } else { + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) + } + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + // Benchmarks the speed of validating the contents of the pending queue of the // transaction pool. func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) } diff --git a/eth/backend.go b/eth/backend.go index c7df517c0..8a837f7b8 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -148,8 +148,10 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { core.WriteChainConfig(chainDb, genesisHash, chainConfig) } - newPool := core.NewTxPool(config.TxPool, eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit) - eth.txPool = newPool + if config.TxPool.Journal != "" { + config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) + } + eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit) maxPeers := config.MaxPeers if config.LightServ > 0 {