diff --git a/core/blockchain.go b/core/blockchain.go index 2d77ecf01..2821fedb0 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -292,22 +292,16 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.currentFinalizedBlock.Store(nilBlock) bc.currentSafeBlock.Store(nilBlock) - // Initialize the chain with ancient data if it isn't empty. - var txIndexBlock uint64 - + // If Geth is initialized with an external ancient store, re-initialize the + // missing chain indexes and chain flags. This procedure can survive crash + // and can be resumed in next restart since chain flags are updated in last step. if bc.empty() { rawdb.InitDatabaseFromFreezer(bc.db) - // If ancient database is not empty, reconstruct all missing - // indices in the background. - frozen, _ := bc.db.Ancients() - if frozen > 0 { - txIndexBlock = frozen - } } + // Load blockchain states from disk if err := bc.loadLastState(); err != nil { return nil, err } - // Make sure the state associated with the block is available head := bc.CurrentBlock() if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil { @@ -415,14 +409,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.wg.Add(1) go bc.updateFutureBlocks() - // Start tx indexer/unindexer. - if txLookupLimit != nil { - bc.txLookupLimit = *txLookupLimit - - bc.wg.Add(1) - go bc.maintainTxIndex(txIndexBlock) - } - // If periodic cache journal is required, spin it up. if bc.cacheConfig.TrieCleanRejournal > 0 { if bc.cacheConfig.TrieCleanRejournal < time.Minute { @@ -442,6 +428,13 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.SetHead(compat.RewindTo) rawdb.WriteChainConfig(db, genesisHash, chainConfig) } + // Start tx indexer/unindexer if required. + if txLookupLimit != nil { + bc.txLookupLimit = *txLookupLimit + + bc.wg.Add(1) + go bc.maintainTxIndex() + } return bc, nil } @@ -2289,6 +2282,44 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool { return false } +// indexBlocks reindexes or unindexes transactions depending on user configuration +func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) { + defer func() { close(done) }() + + // The tail flag is not existent, it means the node is just initialized + // and all blocks(may from ancient store) are not indexed yet. + if tail == nil { + from := uint64(0) + if bc.txLookupLimit != 0 && head >= bc.txLookupLimit { + from = head - bc.txLookupLimit + 1 + } + rawdb.IndexTransactions(bc.db, from, head+1, bc.quit) + return + } + // The tail flag is existent, but the whole chain is required to be indexed. + if bc.txLookupLimit == 0 || head < bc.txLookupLimit { + if *tail > 0 { + // It can happen when chain is rewound to a historical point which + // is even lower than the indexes tail, recap the indexing target + // to new head to avoid reading non-existent block bodies. + end := *tail + if end > head+1 { + end = head + 1 + } + rawdb.IndexTransactions(bc.db, 0, end, bc.quit) + } + return + } + // Update the transaction index to the new chain state + if head-bc.txLookupLimit+1 < *tail { + // Reindex a part of missing indices and rewind index tail to HEAD-limit + rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit) + } else { + // Unindex a part of stale indices and forward index tail to HEAD-limit + rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit) + } +} + // maintainTxIndex is responsible for the construction and deletion of the // transaction index. // @@ -2296,65 +2327,13 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool { // which ancient tx indices get deleted. If `txlookuplimit` is 0, it means // all tx indices will be reserved. // -// The user can adjust the txlookuplimit value for each launch after fast -// sync, Geth will automatically construct the missing indices and delete -// the extra indices. -func (bc *BlockChain) maintainTxIndex(ancients uint64) { +// The user can adjust the txlookuplimit value for each launch after sync, +// Geth will automatically construct the missing indices or delete the extra +// indices. +func (bc *BlockChain) maintainTxIndex() { defer bc.wg.Done() - // Before starting the actual maintenance, we need to handle a special case, - // where user might init Geth with an external ancient database. If so, we - // need to reindex all necessary transactions before starting to process any - // pruning requests. - if ancients > 0 { - var from = uint64(0) - if bc.txLookupLimit != 0 && ancients > bc.txLookupLimit { - from = ancients - bc.txLookupLimit - } - rawdb.IndexTransactions(bc.db, from, ancients, bc.quit) - } - - // indexBlocks reindexes or unindexes transactions depending on user configuration - indexBlocks := func(tail *uint64, head uint64, done chan struct{}) { - defer func() { done <- struct{}{} }() - - // If the user just upgraded Geth to a new version which supports transaction - // index pruning, write the new tail and remove anything older. - if tail == nil { - if bc.txLookupLimit == 0 || head < bc.txLookupLimit { - // Nothing to delete, write the tail and return - rawdb.WriteTxIndexTail(bc.db, 0) - } else { - // Prune all stale tx indices and record the tx index tail - rawdb.UnindexTransactions(bc.db, 0, head-bc.txLookupLimit+1, bc.quit) - } - return - } - // If a previous indexing existed, make sure that we fill in any missing entries - if bc.txLookupLimit == 0 || head < bc.txLookupLimit { - if *tail > 0 { - // It can happen when chain is rewound to a historical point which - // is even lower than the indexes tail, recap the indexing target - // to new head to avoid reading non-existent block bodies. - end := *tail - if end > head+1 { - end = head + 1 - } - rawdb.IndexTransactions(bc.db, 0, end, bc.quit) - } - return - } - // Update the transaction index to the new chain state - if head-bc.txLookupLimit+1 < *tail { - // Reindex a part of missing indices and rewind index tail to HEAD-limit - rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit) - } else { - // Unindex a part of stale indices and forward index tail to HEAD-limit - rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit) - } - } - - // Any reindexing done, start listening to chain events and moving the index window + // Listening to chain events and manipulate the transaction indexes. var ( done chan struct{} // Non-nil if background unindexing or reindexing routine is active. headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed @@ -2370,7 +2349,7 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { case head := <-headCh: if done == nil { done = make(chan struct{}) - go indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done) + go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done) } case <-done: done = nil diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 08061bc24..aba50c862 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -2479,15 +2479,13 @@ func TestTransactionIndices(t *testing.T) { } signer = types.LatestSigner(gspec.Config) ) - height := uint64(128) - genDb, blocks, receipts := GenerateChainWithGenesis(gspec, ethash.NewFaker(), int(height), func(i int, block *BlockGen) { + _, blocks, receipts := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 128, func(i int, block *BlockGen) { tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, block.header.BaseFee, nil), signer, key) if err != nil { panic(err) } block.AddTx(tx) }) - blocks2, _ := GenerateChain(gspec.Config, blocks[len(blocks)-1], ethash.NewFaker(), genDb, 10, nil) check := func(tail *uint64, chain *BlockChain) { stored := rawdb.ReadTxIndexTail(chain.db) @@ -2522,43 +2520,20 @@ func TestTransactionIndices(t *testing.T) { } } } - frdir := t.TempDir() - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) - if err != nil { - t.Fatalf("failed to create temp freezer db: %v", err) - } - // Import all blocks into ancient db - l := uint64(0) - chain, err := NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l) - if err != nil { - t.Fatalf("failed to create tester chain: %v", err) - } - headers := make([]*types.Header, len(blocks)) - for i, block := range blocks { - headers[i] = block.Header() - } - if n, err := chain.InsertHeaderChain(headers, 0); err != nil { - t.Fatalf("failed to insert header %d: %v", n, err) - } - if n, err := chain.InsertReceiptChain(blocks, receipts, 128); err != nil { - t.Fatalf("block %d: failed to insert into chain: %v", n, err) - } - chain.Stop() - ancientDb.Close() - // Init block chain with external ancients, check all needed indices has been indexed. limit := []uint64{0, 32, 64, 128} for _, l := range limit { - ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) - if err != nil { - t.Fatalf("failed to create temp freezer db: %v", err) - } + frdir := t.TempDir() + ancientDb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + rawdb.WriteAncientBlocks(ancientDb, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0)) + l := l - chain, err = NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l) + chain, err := NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } - time.Sleep(50 * time.Millisecond) // Wait for indices initialisation + chain.indexBlocks(rawdb.ReadTxIndexTail(ancientDb), 128, make(chan struct{})) + var tail uint64 if l != 0 { tail = uint64(128) - l + 1 @@ -2566,26 +2541,27 @@ func TestTransactionIndices(t *testing.T) { check(&tail, chain) chain.Stop() ancientDb.Close() + os.RemoveAll(frdir) } // Reconstruct a block chain which only reserves HEAD-64 tx indices - ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) - if err != nil { - t.Fatalf("failed to create temp freezer db: %v", err) - } + ancientDb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) defer ancientDb.Close() + rawdb.WriteAncientBlocks(ancientDb, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0)) limit = []uint64{0, 64 /* drop stale */, 32 /* shorten history */, 64 /* extend history */, 0 /* restore all */} - tails := []uint64{0, 67 /* 130 - 64 + 1 */, 100 /* 131 - 32 + 1 */, 69 /* 132 - 64 + 1 */, 0} - for i, l := range limit { + for _, l := range limit { l := l - chain, err = NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l) + chain, err := NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } - chain.InsertChain(blocks2[i : i+1]) // Feed chain a higher block to trigger indices updater. - time.Sleep(50 * time.Millisecond) // Wait for indices initialisation - check(&tails[i], chain) + var tail uint64 + if l != 0 { + tail = uint64(128) - l + 1 + } + chain.indexBlocks(rawdb.ReadTxIndexTail(ancientDb), 128, make(chan struct{})) + check(&tail, chain) chain.Stop() } } @@ -3784,3 +3760,208 @@ func TestCanonicalHashMarker(t *testing.T) { } } } + +// TestTxIndexer tests the tx indexes are updated correctly. +func TestTxIndexer(t *testing.T) { + var ( + testBankKey, _ = crypto.GenerateKey() + testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey) + testBankFunds = big.NewInt(1000000000000000000) + + gspec = &Genesis{ + Config: params.TestChainConfig, + Alloc: GenesisAlloc{testBankAddress: {Balance: testBankFunds}}, + BaseFee: big.NewInt(params.InitialBaseFee), + } + engine = ethash.NewFaker() + nonce = uint64(0) + ) + _, blocks, receipts := GenerateChainWithGenesis(gspec, engine, 128, func(i int, gen *BlockGen) { + tx, _ := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("0xdeadbeef"), big.NewInt(1000), params.TxGas, big.NewInt(10*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey) + gen.AddTx(tx) + nonce += 1 + }) + + // verifyIndexes checks if the transaction indexes are present or not + // of the specified block. + verifyIndexes := func(db ethdb.Database, number uint64, exist bool) { + if number == 0 { + return + } + block := blocks[number-1] + for _, tx := range block.Transactions() { + lookup := rawdb.ReadTxLookupEntry(db, tx.Hash()) + if exist && lookup == nil { + t.Fatalf("missing %d %x", number, tx.Hash().Hex()) + } + if !exist && lookup != nil { + t.Fatalf("unexpected %d %x", number, tx.Hash().Hex()) + } + } + } + // verifyRange runs verifyIndexes for a range of blocks, from and to are included. + verifyRange := func(db ethdb.Database, from, to uint64, exist bool) { + for number := from; number <= to; number += 1 { + verifyIndexes(db, number, exist) + } + } + verify := func(db ethdb.Database, expTail uint64) { + tail := rawdb.ReadTxIndexTail(db) + if tail == nil { + t.Fatal("Failed to write tx index tail") + } + if *tail != expTail { + t.Fatalf("Unexpected tx index tail, want %v, got %d", expTail, *tail) + } + if *tail != 0 { + verifyRange(db, 0, *tail-1, false) + } + verifyRange(db, *tail, 128, true) + } + + var cases = []struct { + limitA uint64 + tailA uint64 + limitB uint64 + tailB uint64 + limitC uint64 + tailC uint64 + }{ + { + // LimitA: 0 + // TailA: 0 + // + // all blocks are indexed + limitA: 0, + tailA: 0, + + // LimitB: 1 + // TailB: 128 + // + // block-128 is indexed + limitB: 1, + tailB: 128, + + // LimitB: 64 + // TailB: 65 + // + // block [65, 128] are indexed + limitC: 64, + tailC: 65, + }, + { + // LimitA: 64 + // TailA: 65 + // + // block [65, 128] are indexed + limitA: 64, + tailA: 65, + + // LimitB: 1 + // TailB: 128 + // + // block-128 is indexed + limitB: 1, + tailB: 128, + + // LimitB: 64 + // TailB: 65 + // + // block [65, 128] are indexed + limitC: 64, + tailC: 65, + }, + { + // LimitA: 127 + // TailA: 2 + // + // block [2, 128] are indexed + limitA: 127, + tailA: 2, + + // LimitB: 1 + // TailB: 128 + // + // block-128 is indexed + limitB: 1, + tailB: 128, + + // LimitB: 64 + // TailB: 65 + // + // block [65, 128] are indexed + limitC: 64, + tailC: 65, + }, + { + // LimitA: 128 + // TailA: 1 + // + // block [2, 128] are indexed + limitA: 128, + tailA: 1, + + // LimitB: 1 + // TailB: 128 + // + // block-128 is indexed + limitB: 1, + tailB: 128, + + // LimitB: 64 + // TailB: 65 + // + // block [65, 128] are indexed + limitC: 64, + tailC: 65, + }, + { + // LimitA: 129 + // TailA: 0 + // + // block [0, 128] are indexed + limitA: 129, + tailA: 0, + + // LimitB: 1 + // TailB: 128 + // + // block-128 is indexed + limitB: 1, + tailB: 128, + + // LimitB: 64 + // TailB: 65 + // + // block [65, 128] are indexed + limitC: 64, + tailC: 65, + }, + } + for _, c := range cases { + frdir := t.TempDir() + db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0)) + + // Index the initial blocks from ancient store + chain, _ := NewBlockChain(db, nil, gspec, nil, engine, vm.Config{}, nil, &c.limitA) + chain.indexBlocks(nil, 128, make(chan struct{})) + verify(db, c.tailA) + + chain.SetTxLookupLimit(c.limitB) + chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{})) + verify(db, c.tailB) + + chain.SetTxLookupLimit(c.limitC) + chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{})) + verify(db, c.tailC) + + // Recover all indexes + chain.SetTxLookupLimit(0) + chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{})) + verify(db, 0) + + db.Close() + os.RemoveAll(frdir) + } +} diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index aeba3690d..881660aa8 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -259,8 +259,7 @@ func WriteLastPivotNumber(db ethdb.KeyValueWriter, pivot uint64) { } // ReadTxIndexTail retrieves the number of oldest indexed block -// whose transaction indices has been indexed. If the corresponding entry -// is non-existent in database it means the indexing has been finished. +// whose transaction indices has been indexed. func ReadTxIndexTail(db ethdb.KeyValueReader) *uint64 { data, _ := db.Get(txIndexTailKey) if len(data) != 8 {