From 052c634917127b76e9089088860df27598c7ebad Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Thu, 29 Sep 2022 15:50:24 +0800 Subject: [PATCH] core: rework tx indexer (#25723) This PR reworks tx indexer a bit. Compared to the original version, one scenario is no longer handled - upgrading from legacy geth without indexer support. The tx indexer was introduced in 2020 and have been present through hardforks, so it can be assumed that all Geth nodes have tx indexer already. So we can simplify the tx indexer logic a bit: - If the tail flag is not present, it means node is just initialized may or may not with an ancient store attached. In this case all blocks are regarded as unindexed - If the tail flag is present, it means blocks below tail are unindexed, blocks above tail are indexed This change also address some weird cornercases that could make the indexer not work after a crash. --- core/blockchain.go | 131 +++++++---------- core/blockchain_test.go | 267 ++++++++++++++++++++++++++++------ core/rawdb/accessors_chain.go | 3 +- 3 files changed, 280 insertions(+), 121 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 2d77ecf01..2821fedb0 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -292,22 +292,16 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.currentFinalizedBlock.Store(nilBlock) bc.currentSafeBlock.Store(nilBlock) - // Initialize the chain with ancient data if it isn't empty. - var txIndexBlock uint64 - + // If Geth is initialized with an external ancient store, re-initialize the + // missing chain indexes and chain flags. This procedure can survive crash + // and can be resumed in next restart since chain flags are updated in last step. if bc.empty() { rawdb.InitDatabaseFromFreezer(bc.db) - // If ancient database is not empty, reconstruct all missing - // indices in the background. - frozen, _ := bc.db.Ancients() - if frozen > 0 { - txIndexBlock = frozen - } } + // Load blockchain states from disk if err := bc.loadLastState(); err != nil { return nil, err } - // Make sure the state associated with the block is available head := bc.CurrentBlock() if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil { @@ -415,14 +409,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.wg.Add(1) go bc.updateFutureBlocks() - // Start tx indexer/unindexer. - if txLookupLimit != nil { - bc.txLookupLimit = *txLookupLimit - - bc.wg.Add(1) - go bc.maintainTxIndex(txIndexBlock) - } - // If periodic cache journal is required, spin it up. if bc.cacheConfig.TrieCleanRejournal > 0 { if bc.cacheConfig.TrieCleanRejournal < time.Minute { @@ -442,6 +428,13 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.SetHead(compat.RewindTo) rawdb.WriteChainConfig(db, genesisHash, chainConfig) } + // Start tx indexer/unindexer if required. + if txLookupLimit != nil { + bc.txLookupLimit = *txLookupLimit + + bc.wg.Add(1) + go bc.maintainTxIndex() + } return bc, nil } @@ -2289,6 +2282,44 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool { return false } +// indexBlocks reindexes or unindexes transactions depending on user configuration +func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) { + defer func() { close(done) }() + + // The tail flag is not existent, it means the node is just initialized + // and all blocks(may from ancient store) are not indexed yet. + if tail == nil { + from := uint64(0) + if bc.txLookupLimit != 0 && head >= bc.txLookupLimit { + from = head - bc.txLookupLimit + 1 + } + rawdb.IndexTransactions(bc.db, from, head+1, bc.quit) + return + } + // The tail flag is existent, but the whole chain is required to be indexed. + if bc.txLookupLimit == 0 || head < bc.txLookupLimit { + if *tail > 0 { + // It can happen when chain is rewound to a historical point which + // is even lower than the indexes tail, recap the indexing target + // to new head to avoid reading non-existent block bodies. + end := *tail + if end > head+1 { + end = head + 1 + } + rawdb.IndexTransactions(bc.db, 0, end, bc.quit) + } + return + } + // Update the transaction index to the new chain state + if head-bc.txLookupLimit+1 < *tail { + // Reindex a part of missing indices and rewind index tail to HEAD-limit + rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit) + } else { + // Unindex a part of stale indices and forward index tail to HEAD-limit + rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit) + } +} + // maintainTxIndex is responsible for the construction and deletion of the // transaction index. // @@ -2296,65 +2327,13 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool { // which ancient tx indices get deleted. If `txlookuplimit` is 0, it means // all tx indices will be reserved. // -// The user can adjust the txlookuplimit value for each launch after fast -// sync, Geth will automatically construct the missing indices and delete -// the extra indices. -func (bc *BlockChain) maintainTxIndex(ancients uint64) { +// The user can adjust the txlookuplimit value for each launch after sync, +// Geth will automatically construct the missing indices or delete the extra +// indices. +func (bc *BlockChain) maintainTxIndex() { defer bc.wg.Done() - // Before starting the actual maintenance, we need to handle a special case, - // where user might init Geth with an external ancient database. If so, we - // need to reindex all necessary transactions before starting to process any - // pruning requests. - if ancients > 0 { - var from = uint64(0) - if bc.txLookupLimit != 0 && ancients > bc.txLookupLimit { - from = ancients - bc.txLookupLimit - } - rawdb.IndexTransactions(bc.db, from, ancients, bc.quit) - } - - // indexBlocks reindexes or unindexes transactions depending on user configuration - indexBlocks := func(tail *uint64, head uint64, done chan struct{}) { - defer func() { done <- struct{}{} }() - - // If the user just upgraded Geth to a new version which supports transaction - // index pruning, write the new tail and remove anything older. - if tail == nil { - if bc.txLookupLimit == 0 || head < bc.txLookupLimit { - // Nothing to delete, write the tail and return - rawdb.WriteTxIndexTail(bc.db, 0) - } else { - // Prune all stale tx indices and record the tx index tail - rawdb.UnindexTransactions(bc.db, 0, head-bc.txLookupLimit+1, bc.quit) - } - return - } - // If a previous indexing existed, make sure that we fill in any missing entries - if bc.txLookupLimit == 0 || head < bc.txLookupLimit { - if *tail > 0 { - // It can happen when chain is rewound to a historical point which - // is even lower than the indexes tail, recap the indexing target - // to new head to avoid reading non-existent block bodies. - end := *tail - if end > head+1 { - end = head + 1 - } - rawdb.IndexTransactions(bc.db, 0, end, bc.quit) - } - return - } - // Update the transaction index to the new chain state - if head-bc.txLookupLimit+1 < *tail { - // Reindex a part of missing indices and rewind index tail to HEAD-limit - rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit) - } else { - // Unindex a part of stale indices and forward index tail to HEAD-limit - rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit) - } - } - - // Any reindexing done, start listening to chain events and moving the index window + // Listening to chain events and manipulate the transaction indexes. var ( done chan struct{} // Non-nil if background unindexing or reindexing routine is active. headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed @@ -2370,7 +2349,7 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { case head := <-headCh: if done == nil { done = make(chan struct{}) - go indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done) + go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done) } case <-done: done = nil diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 08061bc24..aba50c862 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -2479,15 +2479,13 @@ func TestTransactionIndices(t *testing.T) { } signer = types.LatestSigner(gspec.Config) ) - height := uint64(128) - genDb, blocks, receipts := GenerateChainWithGenesis(gspec, ethash.NewFaker(), int(height), func(i int, block *BlockGen) { + _, blocks, receipts := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 128, func(i int, block *BlockGen) { tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, block.header.BaseFee, nil), signer, key) if err != nil { panic(err) } block.AddTx(tx) }) - blocks2, _ := GenerateChain(gspec.Config, blocks[len(blocks)-1], ethash.NewFaker(), genDb, 10, nil) check := func(tail *uint64, chain *BlockChain) { stored := rawdb.ReadTxIndexTail(chain.db) @@ -2522,43 +2520,20 @@ func TestTransactionIndices(t *testing.T) { } } } - frdir := t.TempDir() - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) - if err != nil { - t.Fatalf("failed to create temp freezer db: %v", err) - } - // Import all blocks into ancient db - l := uint64(0) - chain, err := NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l) - if err != nil { - t.Fatalf("failed to create tester chain: %v", err) - } - headers := make([]*types.Header, len(blocks)) - for i, block := range blocks { - headers[i] = block.Header() - } - if n, err := chain.InsertHeaderChain(headers, 0); err != nil { - t.Fatalf("failed to insert header %d: %v", n, err) - } - if n, err := chain.InsertReceiptChain(blocks, receipts, 128); err != nil { - t.Fatalf("block %d: failed to insert into chain: %v", n, err) - } - chain.Stop() - ancientDb.Close() - // Init block chain with external ancients, check all needed indices has been indexed. limit := []uint64{0, 32, 64, 128} for _, l := range limit { - ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) - if err != nil { - t.Fatalf("failed to create temp freezer db: %v", err) - } + frdir := t.TempDir() + ancientDb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + rawdb.WriteAncientBlocks(ancientDb, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0)) + l := l - chain, err = NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l) + chain, err := NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } - time.Sleep(50 * time.Millisecond) // Wait for indices initialisation + chain.indexBlocks(rawdb.ReadTxIndexTail(ancientDb), 128, make(chan struct{})) + var tail uint64 if l != 0 { tail = uint64(128) - l + 1 @@ -2566,26 +2541,27 @@ func TestTransactionIndices(t *testing.T) { check(&tail, chain) chain.Stop() ancientDb.Close() + os.RemoveAll(frdir) } // Reconstruct a block chain which only reserves HEAD-64 tx indices - ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) - if err != nil { - t.Fatalf("failed to create temp freezer db: %v", err) - } + ancientDb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) defer ancientDb.Close() + rawdb.WriteAncientBlocks(ancientDb, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0)) limit = []uint64{0, 64 /* drop stale */, 32 /* shorten history */, 64 /* extend history */, 0 /* restore all */} - tails := []uint64{0, 67 /* 130 - 64 + 1 */, 100 /* 131 - 32 + 1 */, 69 /* 132 - 64 + 1 */, 0} - for i, l := range limit { + for _, l := range limit { l := l - chain, err = NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l) + chain, err := NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } - chain.InsertChain(blocks2[i : i+1]) // Feed chain a higher block to trigger indices updater. - time.Sleep(50 * time.Millisecond) // Wait for indices initialisation - check(&tails[i], chain) + var tail uint64 + if l != 0 { + tail = uint64(128) - l + 1 + } + chain.indexBlocks(rawdb.ReadTxIndexTail(ancientDb), 128, make(chan struct{})) + check(&tail, chain) chain.Stop() } } @@ -3784,3 +3760,208 @@ func TestCanonicalHashMarker(t *testing.T) { } } } + +// TestTxIndexer tests the tx indexes are updated correctly. +func TestTxIndexer(t *testing.T) { + var ( + testBankKey, _ = crypto.GenerateKey() + testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey) + testBankFunds = big.NewInt(1000000000000000000) + + gspec = &Genesis{ + Config: params.TestChainConfig, + Alloc: GenesisAlloc{testBankAddress: {Balance: testBankFunds}}, + BaseFee: big.NewInt(params.InitialBaseFee), + } + engine = ethash.NewFaker() + nonce = uint64(0) + ) + _, blocks, receipts := GenerateChainWithGenesis(gspec, engine, 128, func(i int, gen *BlockGen) { + tx, _ := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("0xdeadbeef"), big.NewInt(1000), params.TxGas, big.NewInt(10*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey) + gen.AddTx(tx) + nonce += 1 + }) + + // verifyIndexes checks if the transaction indexes are present or not + // of the specified block. + verifyIndexes := func(db ethdb.Database, number uint64, exist bool) { + if number == 0 { + return + } + block := blocks[number-1] + for _, tx := range block.Transactions() { + lookup := rawdb.ReadTxLookupEntry(db, tx.Hash()) + if exist && lookup == nil { + t.Fatalf("missing %d %x", number, tx.Hash().Hex()) + } + if !exist && lookup != nil { + t.Fatalf("unexpected %d %x", number, tx.Hash().Hex()) + } + } + } + // verifyRange runs verifyIndexes for a range of blocks, from and to are included. + verifyRange := func(db ethdb.Database, from, to uint64, exist bool) { + for number := from; number <= to; number += 1 { + verifyIndexes(db, number, exist) + } + } + verify := func(db ethdb.Database, expTail uint64) { + tail := rawdb.ReadTxIndexTail(db) + if tail == nil { + t.Fatal("Failed to write tx index tail") + } + if *tail != expTail { + t.Fatalf("Unexpected tx index tail, want %v, got %d", expTail, *tail) + } + if *tail != 0 { + verifyRange(db, 0, *tail-1, false) + } + verifyRange(db, *tail, 128, true) + } + + var cases = []struct { + limitA uint64 + tailA uint64 + limitB uint64 + tailB uint64 + limitC uint64 + tailC uint64 + }{ + { + // LimitA: 0 + // TailA: 0 + // + // all blocks are indexed + limitA: 0, + tailA: 0, + + // LimitB: 1 + // TailB: 128 + // + // block-128 is indexed + limitB: 1, + tailB: 128, + + // LimitB: 64 + // TailB: 65 + // + // block [65, 128] are indexed + limitC: 64, + tailC: 65, + }, + { + // LimitA: 64 + // TailA: 65 + // + // block [65, 128] are indexed + limitA: 64, + tailA: 65, + + // LimitB: 1 + // TailB: 128 + // + // block-128 is indexed + limitB: 1, + tailB: 128, + + // LimitB: 64 + // TailB: 65 + // + // block [65, 128] are indexed + limitC: 64, + tailC: 65, + }, + { + // LimitA: 127 + // TailA: 2 + // + // block [2, 128] are indexed + limitA: 127, + tailA: 2, + + // LimitB: 1 + // TailB: 128 + // + // block-128 is indexed + limitB: 1, + tailB: 128, + + // LimitB: 64 + // TailB: 65 + // + // block [65, 128] are indexed + limitC: 64, + tailC: 65, + }, + { + // LimitA: 128 + // TailA: 1 + // + // block [2, 128] are indexed + limitA: 128, + tailA: 1, + + // LimitB: 1 + // TailB: 128 + // + // block-128 is indexed + limitB: 1, + tailB: 128, + + // LimitB: 64 + // TailB: 65 + // + // block [65, 128] are indexed + limitC: 64, + tailC: 65, + }, + { + // LimitA: 129 + // TailA: 0 + // + // block [0, 128] are indexed + limitA: 129, + tailA: 0, + + // LimitB: 1 + // TailB: 128 + // + // block-128 is indexed + limitB: 1, + tailB: 128, + + // LimitB: 64 + // TailB: 65 + // + // block [65, 128] are indexed + limitC: 64, + tailC: 65, + }, + } + for _, c := range cases { + frdir := t.TempDir() + db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0)) + + // Index the initial blocks from ancient store + chain, _ := NewBlockChain(db, nil, gspec, nil, engine, vm.Config{}, nil, &c.limitA) + chain.indexBlocks(nil, 128, make(chan struct{})) + verify(db, c.tailA) + + chain.SetTxLookupLimit(c.limitB) + chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{})) + verify(db, c.tailB) + + chain.SetTxLookupLimit(c.limitC) + chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{})) + verify(db, c.tailC) + + // Recover all indexes + chain.SetTxLookupLimit(0) + chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{})) + verify(db, 0) + + db.Close() + os.RemoveAll(frdir) + } +} diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index aeba3690d..881660aa8 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -259,8 +259,7 @@ func WriteLastPivotNumber(db ethdb.KeyValueWriter, pivot uint64) { } // ReadTxIndexTail retrieves the number of oldest indexed block -// whose transaction indices has been indexed. If the corresponding entry -// is non-existent in database it means the indexing has been finished. +// whose transaction indices has been indexed. func ReadTxIndexTail(db ethdb.KeyValueReader) *uint64 { data, _ := db.Get(txIndexTailKey) if len(data) != 8 {