diff --git a/core/blockchain.go b/core/blockchain.go index ce1edd9b7..356340ae1 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -345,6 +345,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par go bc.update() if txLookupLimit != nil { bc.txLookupLimit = *txLookupLimit + + bc.wg.Add(1) go bc.maintainTxIndex(txIndexBlock) } // If periodic cache journal is required, spin it up. @@ -2230,6 +2232,8 @@ func (bc *BlockChain) update() { // sync, Geth will automatically construct the missing indices and delete // the extra indices. func (bc *BlockChain) maintainTxIndex(ancients uint64) { + defer bc.wg.Done() + // Before starting the actual maintenance, we need to handle a special case, // where user might init Geth with an external ancient database. If so, we // need to reindex all necessary transactions before starting to process any @@ -2239,7 +2243,7 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { if bc.txLookupLimit != 0 && ancients > bc.txLookupLimit { from = ancients - bc.txLookupLimit } - rawdb.IndexTransactions(bc.db, from, ancients) + rawdb.IndexTransactions(bc.db, from, ancients, bc.quit) } // indexBlocks reindexes or unindexes transactions depending on user configuration indexBlocks := func(tail *uint64, head uint64, done chan struct{}) { @@ -2253,24 +2257,24 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { rawdb.WriteTxIndexTail(bc.db, 0) } else { // Prune all stale tx indices and record the tx index tail - rawdb.UnindexTransactions(bc.db, 0, head-bc.txLookupLimit+1) + rawdb.UnindexTransactions(bc.db, 0, head-bc.txLookupLimit+1, bc.quit) } return } // If a previous indexing existed, make sure that we fill in any missing entries if bc.txLookupLimit == 0 || head < bc.txLookupLimit { if *tail > 0 { - rawdb.IndexTransactions(bc.db, 0, *tail) + rawdb.IndexTransactions(bc.db, 0, *tail, bc.quit) } return } // Update the transaction index to the new chain state if head-bc.txLookupLimit+1 < *tail { // Reindex a part of missing indices and rewind index tail to HEAD-limit - rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail) + rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit) } else { // Unindex a part of stale indices and forward index tail to HEAD-limit - rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1) + rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit) } } // Any reindexing done, start listening to chain events and moving the index window @@ -2294,6 +2298,10 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { case <-done: done = nil case <-bc.quit: + if done != nil { + log.Info("Waiting background transaction indexer to exit") + <-done + } return } } diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 3130e922e..393b72c26 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -84,15 +84,17 @@ type blockTxHashes struct { } // iterateTransactions iterates over all transactions in the (canon) block -// number(s) given, and yields the hashes on a channel -func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool) (chan *blockTxHashes, chan struct{}) { +// number(s) given, and yields the hashes on a channel. If there is a signal +// received from interrupt channel, the iteration will be aborted and result +// channel will be closed. +func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool, interrupt chan struct{}) chan *blockTxHashes { // One thread sequentially reads data from db type numberRlp struct { number uint64 rlp rlp.RawValue } if to == from { - return nil, nil + return nil } threads := to - from if cpus := runtime.NumCPU(); threads > uint64(cpus) { @@ -101,7 +103,6 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool var ( rlpCh = make(chan *numberRlp, threads*2) // we send raw rlp over this channel hashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh - abortCh = make(chan struct{}) ) // lookup runs in one instance lookup := func() { @@ -115,7 +116,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool // Feed the block to the aggregator, or abort on interrupt select { case rlpCh <- &numberRlp{n, data}: - case <-abortCh: + case <-interrupt: return } if reverse { @@ -168,7 +169,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool // Feed the block to the aggregator, or abort on interrupt select { case hashesCh <- result: - case <-abortCh: + case <-interrupt: return } } @@ -177,25 +178,28 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool for i := 0; i < int(threads); i++ { go process() } - return hashesCh, abortCh + return hashesCh } -// IndexTransactions creates txlookup indices of the specified block range. +// indexTransactions creates txlookup indices of the specified block range. // // This function iterates canonical chain in reverse order, it has one main advantage: // We can write tx index tail flag periodically even without the whole indexing // procedure is finished. So that we can resume indexing procedure next time quickly. -func IndexTransactions(db ethdb.Database, from uint64, to uint64) { +// +// There is a passed channel, the whole procedure will be interrupted if any +// signal received. +func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { // short circuit for invalid range if from >= to { return } var ( - hashesCh, abortCh = iterateTransactions(db, from, to, true) - batch = db.NewBatch() - start = time.Now() - logged = start.Add(-7 * time.Second) - // Since we iterate in reverse, we expect the first number to come + hashesCh = iterateTransactions(db, from, to, true, interrupt) + batch = db.NewBatch() + start = time.Now() + logged = start.Add(-7 * time.Second) + // Since we iterate in reverse, we expect the first number to come // in to be [to-1]. Therefore, setting lastNum to means that the // prqueue gap-evaluation will work correctly lastNum = to @@ -203,8 +207,6 @@ func IndexTransactions(db ethdb.Database, from uint64, to uint64) { // for stats reporting blocks, txs = 0, 0 ) - defer close(abortCh) - for chanDelivery := range hashesCh { // Push the delivery into the queue and process contiguous ranges. // Since we iterate in reverse, so lower numbers have lower prio, and @@ -215,6 +217,10 @@ func IndexTransactions(db ethdb.Database, from uint64, to uint64) { if _, priority := queue.Peek(); priority != int64(lastNum-1) { break } + // For testing + if hook != nil && !hook(lastNum-1) { + break + } // Next block available, pop it off and index it delivery := queue.PopItem().(*blockTxHashes) lastNum = delivery.number @@ -223,8 +229,7 @@ func IndexTransactions(db ethdb.Database, from uint64, to uint64) { txs += len(delivery.hashes) // If enough data was accumulated in memory or we're at the last block, dump to disk if batch.ValueSize() > ethdb.IdealBatchSize { - // Also write the tail there - WriteTxIndexTail(batch, lastNum) + WriteTxIndexTail(batch, lastNum) // Also write the tail here if err := batch.Write(); err != nil { log.Crit("Failed writing batch to db", "error", err) return @@ -238,67 +243,122 @@ func IndexTransactions(db ethdb.Database, from uint64, to uint64) { } } } - if lastNum < to { - WriteTxIndexTail(batch, lastNum) - // No need to write the batch if we never entered the loop above... + // If there exists uncommitted data, flush them. + if batch.ValueSize() > 0 { + WriteTxIndexTail(batch, lastNum) // Also write the tail there if err := batch.Write(); err != nil { log.Crit("Failed writing batch to db", "error", err) return } } - log.Info("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) + select { + case <-interrupt: + log.Debug("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) + default: + log.Info("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) + } } -// UnindexTransactions removes txlookup indices of the specified block range. -func UnindexTransactions(db ethdb.Database, from uint64, to uint64) { +// IndexTransactions creates txlookup indices of the specified block range. +// +// This function iterates canonical chain in reverse order, it has one main advantage: +// We can write tx index tail flag periodically even without the whole indexing +// procedure is finished. So that we can resume indexing procedure next time quickly. +// +// There is a passed channel, the whole procedure will be interrupted if any +// signal received. +func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) { + indexTransactions(db, from, to, interrupt, nil) +} + +// indexTransactionsForTesting is the internal debug version with an additional hook. +func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { + indexTransactions(db, from, to, interrupt, hook) +} + +// unindexTransactions removes txlookup indices of the specified block range. +// +// There is a passed channel, the whole procedure will be interrupted if any +// signal received. +func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { // short circuit for invalid range if from >= to { return } - // Write flag first and then unindex the transaction indices. Some indices - // will be left in the database if crash happens but it's fine. - WriteTxIndexTail(db, to) - // If only one block is unindexed, do it directly - //if from+1 == to { - // data := ReadCanonicalBodyRLP(db, uint64(from)) - // DeleteTxLookupEntries(db, ReadBlock(db, ReadCanonicalHash(db, from), from)) - // log.Info("Unindexed transactions", "blocks", 1, "tail", to) - // return - //} - // TODO @holiman, add this back (if we want it) var ( - hashesCh, abortCh = iterateTransactions(db, from, to, false) - batch = db.NewBatch() - start = time.Now() - logged = start.Add(-7 * time.Second) + hashesCh = iterateTransactions(db, from, to, false, interrupt) + batch = db.NewBatch() + start = time.Now() + logged = start.Add(-7 * time.Second) + // we expect the first number to come in to be [from]. Therefore, setting + // nextNum to from means that the prqueue gap-evaluation will work correctly + nextNum = from + queue = prque.New(nil) + // for stats reporting + blocks, txs = 0, 0 ) - defer close(abortCh) // Otherwise spin up the concurrent iterator and unindexer - blocks, txs := 0, 0 for delivery := range hashesCh { - DeleteTxLookupEntries(batch, delivery.hashes) - txs += len(delivery.hashes) - blocks++ - - // If enough data was accumulated in memory or we're at the last block, dump to disk - // A batch counts the size of deletion as '1', so we need to flush more - // often than that. - if blocks%1000 == 0 { - if err := batch.Write(); err != nil { - log.Crit("Failed writing batch to db", "error", err) - return + // Push the delivery into the queue and process contiguous ranges. + queue.Push(delivery, -int64(delivery.number)) + for !queue.Empty() { + // If the next available item is gapped, return + if _, priority := queue.Peek(); -priority != int64(nextNum) { + break + } + // For testing + if hook != nil && !hook(nextNum) { + break + } + delivery := queue.PopItem().(*blockTxHashes) + nextNum = delivery.number + 1 + DeleteTxLookupEntries(batch, delivery.hashes) + txs += len(delivery.hashes) + blocks++ + + // If enough data was accumulated in memory or we're at the last block, dump to disk + // A batch counts the size of deletion as '1', so we need to flush more + // often than that. + if blocks%1000 == 0 { + WriteTxIndexTail(batch, nextNum) + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + batch.Reset() + } + // If we've spent too much time already, notify the user of what we're doing + if time.Since(logged) > 8*time.Second { + log.Info("Unindexing transactions", "blocks", blocks, "txs", txs, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() } - batch.Reset() - } - // If we've spent too much time already, notify the user of what we're doing - if time.Since(logged) > 8*time.Second { - log.Info("Unindexing transactions", "blocks", blocks, "txs", txs, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start))) - logged = time.Now() } } - if err := batch.Write(); err != nil { - log.Crit("Failed writing batch to db", "error", err) - return + // Commit the last batch if there exists uncommitted data + if batch.ValueSize() > 0 { + WriteTxIndexTail(batch, nextNum) + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + } + select { + case <-interrupt: + log.Debug("Transaction unindexing interrupted", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) + default: + log.Info("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) } - log.Info("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) +} + +// UnindexTransactions removes txlookup indices of the specified block range. +// +// There is a passed channel, the whole procedure will be interrupted if any +// signal received. +func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) { + unindexTransactions(db, from, to, interrupt, nil) +} + +// unindexTransactionsForTesting is the internal debug version with an additional hook. +func unindexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { + unindexTransactions(db, from, to, interrupt, hook) } diff --git a/core/rawdb/chain_iterator_test.go b/core/rawdb/chain_iterator_test.go index c635cd2f1..90b2639d3 100644 --- a/core/rawdb/chain_iterator_test.go +++ b/core/rawdb/chain_iterator_test.go @@ -20,6 +20,7 @@ import ( "math/big" "reflect" "sort" + "sync" "testing" "github.com/ethereum/go-ethereum/common" @@ -59,7 +60,7 @@ func TestChainIterator(t *testing.T) { } for i, c := range cases { var numbers []int - hashCh, _ := iterateTransactions(chainDb, c.from, c.to, c.reverse) + hashCh := iterateTransactions(chainDb, c.from, c.to, c.reverse, nil) if hashCh != nil { for h := range hashCh { numbers = append(numbers, int(h.number)) @@ -80,3 +81,85 @@ func TestChainIterator(t *testing.T) { } } } + +func TestIndexTransactions(t *testing.T) { + // Construct test chain db + chainDb := NewMemoryDatabase() + + var block *types.Block + var txs []*types.Transaction + for i := uint64(0); i <= 10; i++ { + if i == 0 { + block = types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, nil, nil, nil, newHasher()) // Empty genesis block + } else { + tx := types.NewTransaction(i, common.BytesToAddress([]byte{0x11}), big.NewInt(111), 1111, big.NewInt(11111), []byte{0x11, 0x11, 0x11}) + txs = append(txs, tx) + block = types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, []*types.Transaction{tx}, nil, nil, newHasher()) + } + WriteBlock(chainDb, block) + WriteCanonicalHash(chainDb, block.Hash(), block.NumberU64()) + } + // verify checks whether the tx indices in the range [from, to) + // is expected. + verify := func(from, to int, exist bool, tail uint64) { + for i := from; i < to; i++ { + if i == 0 { + continue + } + number := ReadTxLookupEntry(chainDb, txs[i-1].Hash()) + if exist && number == nil { + t.Fatalf("Transaction indice missing") + } + if !exist && number != nil { + t.Fatalf("Transaction indice is not deleted") + } + } + number := ReadTxIndexTail(chainDb) + if number == nil || *number != tail { + t.Fatalf("Transaction tail mismatch") + } + } + IndexTransactions(chainDb, 5, 11, nil) + verify(5, 11, true, 5) + verify(0, 5, false, 5) + + IndexTransactions(chainDb, 0, 5, nil) + verify(0, 11, true, 0) + + UnindexTransactions(chainDb, 0, 5, nil) + verify(5, 11, true, 5) + verify(0, 5, false, 5) + + UnindexTransactions(chainDb, 5, 11, nil) + verify(0, 11, false, 11) + + // Testing corner cases + signal := make(chan struct{}) + var once sync.Once + indexTransactionsForTesting(chainDb, 5, 11, signal, func(n uint64) bool { + if n <= 8 { + once.Do(func() { + close(signal) + }) + return false + } + return true + }) + verify(9, 11, true, 9) + verify(0, 9, false, 9) + IndexTransactions(chainDb, 0, 9, nil) + + signal = make(chan struct{}) + var once2 sync.Once + unindexTransactionsForTesting(chainDb, 0, 11, signal, func(n uint64) bool { + if n >= 8 { + once2.Do(func() { + close(signal) + }) + return false + } + return true + }) + verify(8, 11, true, 8) + verify(0, 8, false, 8) +}