From 1c45f2f42e6578258e74c790d063b8639ec58bc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 8 Aug 2017 11:59:34 +0300 Subject: [PATCH] core: fix txpool journal and test races --- core/tx_pool.go | 19 +++++++++++++--- core/tx_pool_test.go | 54 ++++++++++++++++++++++---------------------- 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 16f774265..b0c251f92 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -207,7 +207,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e } pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(&pool.all) - pool.resetState() + pool.reset() // If local transactions and journaling is enabled, load from disk if !config.NoLocals && config.Journal != "" { @@ -261,7 +261,7 @@ func (pool *TxPool) loop() { pool.homestead = true } } - pool.resetState() + pool.reset() pool.mu.Unlock() case RemovedTransactionEvent: @@ -300,15 +300,28 @@ func (pool *TxPool) loop() { // Handle local transaction journal rotation case <-journal.C: if pool.journal != nil { + pool.mu.Lock() if err := pool.journal.rotate(pool.local()); err != nil { log.Warn("Failed to rotate local tx journal", "err", err) } + pool.mu.Unlock() } } } } -func (pool *TxPool) resetState() { +// lockedReset is a wrapper around reset to allow calling it in a thread safe +// manner. This method is only ever used in the tester! +func (pool *TxPool) lockedReset() { + pool.mu.Lock() + defer pool.mu.Unlock() + + pool.reset() +} + +// reset retrieves the current state of the blockchain and ensures the content +// of the transaction pool is valid with regard to the chain state. +func (pool *TxPool) reset() { currentState, err := pool.currentState() if err != nil { log.Error("Failed reset txpool state", "err", err) diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 020d6bedd..fcb330051 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -153,7 +153,7 @@ func TestStateChangeDuringPoolReset(t *testing.T) { // trigger state change in the background trigger = true - pool.resetState() + pool.lockedReset() pendingTx, err := pool.Pending() if err != nil { @@ -213,7 +213,7 @@ func TestTransactionQueue(t *testing.T) { from, _ := deriveSender(tx) currentState, _ := pool.currentState() currentState.AddBalance(from, big.NewInt(1000)) - pool.resetState() + pool.lockedReset() pool.enqueueTx(tx.Hash(), tx) pool.promoteExecutables(currentState, []common.Address{from}) @@ -243,7 +243,7 @@ func TestTransactionQueue(t *testing.T) { from, _ = deriveSender(tx1) currentState, _ = pool.currentState() currentState.AddBalance(from, big.NewInt(1000)) - pool.resetState() + pool.lockedReset() pool.enqueueTx(tx1.Hash(), tx1) pool.enqueueTx(tx2.Hash(), tx2) @@ -314,7 +314,7 @@ func TestTransactionChainFork(t *testing.T) { pool.currentState = func() (*state.StateDB, error) { return statedb, nil } currentState, _ := pool.currentState() currentState.AddBalance(addr, big.NewInt(100000000000000)) - pool.resetState() + pool.lockedReset() } resetState() @@ -342,7 +342,7 @@ func TestTransactionDoubleNonce(t *testing.T) { pool.currentState = func() (*state.StateDB, error) { return statedb, nil } currentState, _ := pool.currentState() currentState.AddBalance(addr, big.NewInt(100000000000000)) - pool.resetState() + pool.lockedReset() } resetState() @@ -412,14 +412,14 @@ func TestNonceRecovery(t *testing.T) { currentState, _ := pool.currentState() currentState.SetNonce(addr, n) currentState.AddBalance(addr, big.NewInt(100000000000000)) - pool.resetState() + pool.lockedReset() tx := transaction(n, big.NewInt(100000), key) if err := pool.AddRemote(tx); err != nil { t.Error(err) } // simulate some weird re-order of transactions and missing nonce(s) currentState.SetNonce(addr, n-1) - pool.resetState() + pool.lockedReset() if fn := pool.pendingState.GetNonce(addr); fn != n+1 { t.Errorf("expected nonce to be %d, got %d", n+1, fn) } @@ -433,7 +433,7 @@ func TestRemovedTxEvent(t *testing.T) { from, _ := deriveSender(tx) currentState, _ := pool.currentState() currentState.AddBalance(from, big.NewInt(1000000000000)) - pool.resetState() + pool.lockedReset() pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}}) pool.eventMux.Post(ChainHeadEvent{nil}) if pool.pending[from].Len() != 1 { @@ -482,7 +482,7 @@ func TestTransactionDropping(t *testing.T) { if len(pool.all) != 6 { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6) } - pool.resetState() + pool.lockedReset() if pool.pending[account].Len() != 3 { t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3) } @@ -494,7 +494,7 @@ func TestTransactionDropping(t *testing.T) { } // Reduce the balance of the account, and check that invalidated transactions are dropped state.AddBalance(account, big.NewInt(-650)) - pool.resetState() + pool.lockedReset() if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { t.Errorf("funded pending transaction missing: %v", tx0) @@ -519,7 +519,7 @@ func TestTransactionDropping(t *testing.T) { } // Reduce the block gas limit, check that invalidated transactions are dropped pool.gasLimit = func() *big.Int { return big.NewInt(100) } - pool.resetState() + pool.lockedReset() if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { t.Errorf("funded pending transaction missing: %v", tx0) @@ -573,7 +573,7 @@ func TestTransactionPostponing(t *testing.T) { if len(pool.all) != len(txns) { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns)) } - pool.resetState() + pool.lockedReset() if pool.pending[account].Len() != len(txns) { t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), len(txns)) } @@ -585,7 +585,7 @@ func TestTransactionPostponing(t *testing.T) { } // Reduce the balance of the account, and check that transactions are reorganised state.AddBalance(account, big.NewInt(-750)) - pool.resetState() + pool.lockedReset() if _, ok := pool.pending[account].txs.items[txns[0].Nonce()]; !ok { t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txns[0]) @@ -626,7 +626,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { state, _ := pool.currentState() state.AddBalance(account, big.NewInt(1000000)) - pool.resetState() + pool.lockedReset() // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ { @@ -780,7 +780,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { if err := pool.AddRemote(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), remote)); err != nil { t.Fatalf("failed to add remote transaction: %v", err) } - pending, queued := pool.stats() + pending, queued := pool.Stats() if pending != 0 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) } @@ -793,7 +793,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { // Wait a bit for eviction to run and clean up any leftovers, and ensure only the local remains time.Sleep(2 * config.Lifetime) - pending, queued = pool.stats() + pending, queued = pool.Stats() if pending != 0 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) } @@ -823,7 +823,7 @@ func TestTransactionPendingLimiting(t *testing.T) { state, _ := pool.currentState() state.AddBalance(account, big.NewInt(1000000)) - pool.resetState() + pool.lockedReset() // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { @@ -1057,7 +1057,7 @@ func TestTransactionPoolRepricing(t *testing.T) { pool.AddRemotes(txs) pool.AddLocal(ltx) - pending, queued := pool.stats() + pending, queued := pool.Stats() if pending != 4 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4) } @@ -1070,7 +1070,7 @@ func TestTransactionPoolRepricing(t *testing.T) { // Reprice the pool and check that underpriced transactions get dropped pool.SetGasPrice(big.NewInt(2)) - pending, queued = pool.stats() + pending, queued = pool.Stats() if pending != 2 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) } @@ -1095,7 +1095,7 @@ func TestTransactionPoolRepricing(t *testing.T) { if err := pool.AddLocal(tx); err != nil { t.Fatalf("failed to add underpriced local transaction: %v", err) } - if pending, _ = pool.stats(); pending != 3 { + if pending, _ = pool.Stats(); pending != 3 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) } if err := validateTxPoolInternals(pool); err != nil { @@ -1142,7 +1142,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { pool.AddRemotes(txs) pool.AddLocal(ltx) - pending, queued := pool.stats() + pending, queued := pool.Stats() if pending != 3 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) } @@ -1166,7 +1166,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if err := pool.AddRemote(pricedTransaction(3, big.NewInt(100000), big.NewInt(5), keys[1])); err != nil { t.Fatalf("failed to add well priced transaction: %v", err) } - pending, queued = pool.stats() + pending, queued = pool.Stats() if pending != 2 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) } @@ -1181,7 +1181,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if err := pool.AddLocal(tx); err != nil { t.Fatalf("failed to add underpriced local transaction: %v", err) } - pending, queued = pool.stats() + pending, queued = pool.Stats() if pending != 2 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) } @@ -1307,7 +1307,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), remote)); err != nil { t.Fatalf("failed to add remote transaction: %v", err) } - pending, queued := pool.stats() + pending, queued := pool.Stats() if pending != 4 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4) } @@ -1322,7 +1322,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) - pending, queued = pool.stats() + pending, queued = pool.Stats() if queued != 0 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } @@ -1340,13 +1340,13 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { } // Bump the nonce temporarily and ensure the newly invalidated transaction is removed statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2) - pool.resetState() + pool.lockedReset() time.Sleep(2 * config.Rejournal) pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) - pending, queued = pool.stats() + pending, queued = pool.Stats() if pending != 0 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) }