core: rework tx indexer (#25723)
This PR reworks tx indexer a bit. Compared to the original version, one scenario is no longer handled - upgrading from legacy geth without indexer support. The tx indexer was introduced in 2020 and have been present through hardforks, so it can be assumed that all Geth nodes have tx indexer already. So we can simplify the tx indexer logic a bit: - If the tail flag is not present, it means node is just initialized may or may not with an ancient store attached. In this case all blocks are regarded as unindexed - If the tail flag is present, it means blocks below tail are unindexed, blocks above tail are indexed This change also address some weird cornercases that could make the indexer not work after a crash.
This commit is contained in:
parent
88132afc3f
commit
052c634917
@ -292,22 +292,16 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
|
||||
bc.currentFinalizedBlock.Store(nilBlock)
|
||||
bc.currentSafeBlock.Store(nilBlock)
|
||||
|
||||
// Initialize the chain with ancient data if it isn't empty.
|
||||
var txIndexBlock uint64
|
||||
|
||||
// If Geth is initialized with an external ancient store, re-initialize the
|
||||
// missing chain indexes and chain flags. This procedure can survive crash
|
||||
// and can be resumed in next restart since chain flags are updated in last step.
|
||||
if bc.empty() {
|
||||
rawdb.InitDatabaseFromFreezer(bc.db)
|
||||
// If ancient database is not empty, reconstruct all missing
|
||||
// indices in the background.
|
||||
frozen, _ := bc.db.Ancients()
|
||||
if frozen > 0 {
|
||||
txIndexBlock = frozen
|
||||
}
|
||||
}
|
||||
// Load blockchain states from disk
|
||||
if err := bc.loadLastState(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Make sure the state associated with the block is available
|
||||
head := bc.CurrentBlock()
|
||||
if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil {
|
||||
@ -415,14 +409,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
|
||||
bc.wg.Add(1)
|
||||
go bc.updateFutureBlocks()
|
||||
|
||||
// Start tx indexer/unindexer.
|
||||
if txLookupLimit != nil {
|
||||
bc.txLookupLimit = *txLookupLimit
|
||||
|
||||
bc.wg.Add(1)
|
||||
go bc.maintainTxIndex(txIndexBlock)
|
||||
}
|
||||
|
||||
// If periodic cache journal is required, spin it up.
|
||||
if bc.cacheConfig.TrieCleanRejournal > 0 {
|
||||
if bc.cacheConfig.TrieCleanRejournal < time.Minute {
|
||||
@ -442,6 +428,13 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
|
||||
bc.SetHead(compat.RewindTo)
|
||||
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
|
||||
}
|
||||
// Start tx indexer/unindexer if required.
|
||||
if txLookupLimit != nil {
|
||||
bc.txLookupLimit = *txLookupLimit
|
||||
|
||||
bc.wg.Add(1)
|
||||
go bc.maintainTxIndex()
|
||||
}
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
@ -2289,48 +2282,21 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// maintainTxIndex is responsible for the construction and deletion of the
|
||||
// transaction index.
|
||||
//
|
||||
// User can use flag `txlookuplimit` to specify a "recentness" block, below
|
||||
// which ancient tx indices get deleted. If `txlookuplimit` is 0, it means
|
||||
// all tx indices will be reserved.
|
||||
//
|
||||
// The user can adjust the txlookuplimit value for each launch after fast
|
||||
// sync, Geth will automatically construct the missing indices and delete
|
||||
// the extra indices.
|
||||
func (bc *BlockChain) maintainTxIndex(ancients uint64) {
|
||||
defer bc.wg.Done()
|
||||
// indexBlocks reindexes or unindexes transactions depending on user configuration
|
||||
func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) {
|
||||
defer func() { close(done) }()
|
||||
|
||||
// Before starting the actual maintenance, we need to handle a special case,
|
||||
// where user might init Geth with an external ancient database. If so, we
|
||||
// need to reindex all necessary transactions before starting to process any
|
||||
// pruning requests.
|
||||
if ancients > 0 {
|
||||
var from = uint64(0)
|
||||
if bc.txLookupLimit != 0 && ancients > bc.txLookupLimit {
|
||||
from = ancients - bc.txLookupLimit
|
||||
}
|
||||
rawdb.IndexTransactions(bc.db, from, ancients, bc.quit)
|
||||
}
|
||||
|
||||
// indexBlocks reindexes or unindexes transactions depending on user configuration
|
||||
indexBlocks := func(tail *uint64, head uint64, done chan struct{}) {
|
||||
defer func() { done <- struct{}{} }()
|
||||
|
||||
// If the user just upgraded Geth to a new version which supports transaction
|
||||
// index pruning, write the new tail and remove anything older.
|
||||
// The tail flag is not existent, it means the node is just initialized
|
||||
// and all blocks(may from ancient store) are not indexed yet.
|
||||
if tail == nil {
|
||||
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
|
||||
// Nothing to delete, write the tail and return
|
||||
rawdb.WriteTxIndexTail(bc.db, 0)
|
||||
} else {
|
||||
// Prune all stale tx indices and record the tx index tail
|
||||
rawdb.UnindexTransactions(bc.db, 0, head-bc.txLookupLimit+1, bc.quit)
|
||||
from := uint64(0)
|
||||
if bc.txLookupLimit != 0 && head >= bc.txLookupLimit {
|
||||
from = head - bc.txLookupLimit + 1
|
||||
}
|
||||
rawdb.IndexTransactions(bc.db, from, head+1, bc.quit)
|
||||
return
|
||||
}
|
||||
// If a previous indexing existed, make sure that we fill in any missing entries
|
||||
// The tail flag is existent, but the whole chain is required to be indexed.
|
||||
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
|
||||
if *tail > 0 {
|
||||
// It can happen when chain is rewound to a historical point which
|
||||
@ -2352,9 +2318,22 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) {
|
||||
// Unindex a part of stale indices and forward index tail to HEAD-limit
|
||||
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Any reindexing done, start listening to chain events and moving the index window
|
||||
// maintainTxIndex is responsible for the construction and deletion of the
|
||||
// transaction index.
|
||||
//
|
||||
// User can use flag `txlookuplimit` to specify a "recentness" block, below
|
||||
// which ancient tx indices get deleted. If `txlookuplimit` is 0, it means
|
||||
// all tx indices will be reserved.
|
||||
//
|
||||
// The user can adjust the txlookuplimit value for each launch after sync,
|
||||
// Geth will automatically construct the missing indices or delete the extra
|
||||
// indices.
|
||||
func (bc *BlockChain) maintainTxIndex() {
|
||||
defer bc.wg.Done()
|
||||
|
||||
// Listening to chain events and manipulate the transaction indexes.
|
||||
var (
|
||||
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
|
||||
headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
|
||||
@ -2370,7 +2349,7 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) {
|
||||
case head := <-headCh:
|
||||
if done == nil {
|
||||
done = make(chan struct{})
|
||||
go indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
|
||||
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
|
||||
}
|
||||
case <-done:
|
||||
done = nil
|
||||
|
@ -2479,15 +2479,13 @@ func TestTransactionIndices(t *testing.T) {
|
||||
}
|
||||
signer = types.LatestSigner(gspec.Config)
|
||||
)
|
||||
height := uint64(128)
|
||||
genDb, blocks, receipts := GenerateChainWithGenesis(gspec, ethash.NewFaker(), int(height), func(i int, block *BlockGen) {
|
||||
_, blocks, receipts := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 128, func(i int, block *BlockGen) {
|
||||
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, block.header.BaseFee, nil), signer, key)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
block.AddTx(tx)
|
||||
})
|
||||
blocks2, _ := GenerateChain(gspec.Config, blocks[len(blocks)-1], ethash.NewFaker(), genDb, 10, nil)
|
||||
|
||||
check := func(tail *uint64, chain *BlockChain) {
|
||||
stored := rawdb.ReadTxIndexTail(chain.db)
|
||||
@ -2522,43 +2520,20 @@ func TestTransactionIndices(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Init block chain with external ancients, check all needed indices has been indexed.
|
||||
limit := []uint64{0, 32, 64, 128}
|
||||
for _, l := range limit {
|
||||
frdir := t.TempDir()
|
||||
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp freezer db: %v", err)
|
||||
}
|
||||
// Import all blocks into ancient db
|
||||
l := uint64(0)
|
||||
ancientDb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
|
||||
rawdb.WriteAncientBlocks(ancientDb, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0))
|
||||
|
||||
l := l
|
||||
chain, err := NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create tester chain: %v", err)
|
||||
}
|
||||
headers := make([]*types.Header, len(blocks))
|
||||
for i, block := range blocks {
|
||||
headers[i] = block.Header()
|
||||
}
|
||||
if n, err := chain.InsertHeaderChain(headers, 0); err != nil {
|
||||
t.Fatalf("failed to insert header %d: %v", n, err)
|
||||
}
|
||||
if n, err := chain.InsertReceiptChain(blocks, receipts, 128); err != nil {
|
||||
t.Fatalf("block %d: failed to insert into chain: %v", n, err)
|
||||
}
|
||||
chain.Stop()
|
||||
ancientDb.Close()
|
||||
chain.indexBlocks(rawdb.ReadTxIndexTail(ancientDb), 128, make(chan struct{}))
|
||||
|
||||
// Init block chain with external ancients, check all needed indices has been indexed.
|
||||
limit := []uint64{0, 32, 64, 128}
|
||||
for _, l := range limit {
|
||||
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp freezer db: %v", err)
|
||||
}
|
||||
l := l
|
||||
chain, err = NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create tester chain: %v", err)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond) // Wait for indices initialisation
|
||||
var tail uint64
|
||||
if l != 0 {
|
||||
tail = uint64(128) - l + 1
|
||||
@ -2566,26 +2541,27 @@ func TestTransactionIndices(t *testing.T) {
|
||||
check(&tail, chain)
|
||||
chain.Stop()
|
||||
ancientDb.Close()
|
||||
os.RemoveAll(frdir)
|
||||
}
|
||||
|
||||
// Reconstruct a block chain which only reserves HEAD-64 tx indices
|
||||
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp freezer db: %v", err)
|
||||
}
|
||||
ancientDb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false)
|
||||
defer ancientDb.Close()
|
||||
|
||||
rawdb.WriteAncientBlocks(ancientDb, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0))
|
||||
limit = []uint64{0, 64 /* drop stale */, 32 /* shorten history */, 64 /* extend history */, 0 /* restore all */}
|
||||
tails := []uint64{0, 67 /* 130 - 64 + 1 */, 100 /* 131 - 32 + 1 */, 69 /* 132 - 64 + 1 */, 0}
|
||||
for i, l := range limit {
|
||||
for _, l := range limit {
|
||||
l := l
|
||||
chain, err = NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l)
|
||||
chain, err := NewBlockChain(ancientDb, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create tester chain: %v", err)
|
||||
}
|
||||
chain.InsertChain(blocks2[i : i+1]) // Feed chain a higher block to trigger indices updater.
|
||||
time.Sleep(50 * time.Millisecond) // Wait for indices initialisation
|
||||
check(&tails[i], chain)
|
||||
var tail uint64
|
||||
if l != 0 {
|
||||
tail = uint64(128) - l + 1
|
||||
}
|
||||
chain.indexBlocks(rawdb.ReadTxIndexTail(ancientDb), 128, make(chan struct{}))
|
||||
check(&tail, chain)
|
||||
chain.Stop()
|
||||
}
|
||||
}
|
||||
@ -3784,3 +3760,208 @@ func TestCanonicalHashMarker(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestTxIndexer tests the tx indexes are updated correctly.
|
||||
func TestTxIndexer(t *testing.T) {
|
||||
var (
|
||||
testBankKey, _ = crypto.GenerateKey()
|
||||
testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
|
||||
testBankFunds = big.NewInt(1000000000000000000)
|
||||
|
||||
gspec = &Genesis{
|
||||
Config: params.TestChainConfig,
|
||||
Alloc: GenesisAlloc{testBankAddress: {Balance: testBankFunds}},
|
||||
BaseFee: big.NewInt(params.InitialBaseFee),
|
||||
}
|
||||
engine = ethash.NewFaker()
|
||||
nonce = uint64(0)
|
||||
)
|
||||
_, blocks, receipts := GenerateChainWithGenesis(gspec, engine, 128, func(i int, gen *BlockGen) {
|
||||
tx, _ := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("0xdeadbeef"), big.NewInt(1000), params.TxGas, big.NewInt(10*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey)
|
||||
gen.AddTx(tx)
|
||||
nonce += 1
|
||||
})
|
||||
|
||||
// verifyIndexes checks if the transaction indexes are present or not
|
||||
// of the specified block.
|
||||
verifyIndexes := func(db ethdb.Database, number uint64, exist bool) {
|
||||
if number == 0 {
|
||||
return
|
||||
}
|
||||
block := blocks[number-1]
|
||||
for _, tx := range block.Transactions() {
|
||||
lookup := rawdb.ReadTxLookupEntry(db, tx.Hash())
|
||||
if exist && lookup == nil {
|
||||
t.Fatalf("missing %d %x", number, tx.Hash().Hex())
|
||||
}
|
||||
if !exist && lookup != nil {
|
||||
t.Fatalf("unexpected %d %x", number, tx.Hash().Hex())
|
||||
}
|
||||
}
|
||||
}
|
||||
// verifyRange runs verifyIndexes for a range of blocks, from and to are included.
|
||||
verifyRange := func(db ethdb.Database, from, to uint64, exist bool) {
|
||||
for number := from; number <= to; number += 1 {
|
||||
verifyIndexes(db, number, exist)
|
||||
}
|
||||
}
|
||||
verify := func(db ethdb.Database, expTail uint64) {
|
||||
tail := rawdb.ReadTxIndexTail(db)
|
||||
if tail == nil {
|
||||
t.Fatal("Failed to write tx index tail")
|
||||
}
|
||||
if *tail != expTail {
|
||||
t.Fatalf("Unexpected tx index tail, want %v, got %d", expTail, *tail)
|
||||
}
|
||||
if *tail != 0 {
|
||||
verifyRange(db, 0, *tail-1, false)
|
||||
}
|
||||
verifyRange(db, *tail, 128, true)
|
||||
}
|
||||
|
||||
var cases = []struct {
|
||||
limitA uint64
|
||||
tailA uint64
|
||||
limitB uint64
|
||||
tailB uint64
|
||||
limitC uint64
|
||||
tailC uint64
|
||||
}{
|
||||
{
|
||||
// LimitA: 0
|
||||
// TailA: 0
|
||||
//
|
||||
// all blocks are indexed
|
||||
limitA: 0,
|
||||
tailA: 0,
|
||||
|
||||
// LimitB: 1
|
||||
// TailB: 128
|
||||
//
|
||||
// block-128 is indexed
|
||||
limitB: 1,
|
||||
tailB: 128,
|
||||
|
||||
// LimitB: 64
|
||||
// TailB: 65
|
||||
//
|
||||
// block [65, 128] are indexed
|
||||
limitC: 64,
|
||||
tailC: 65,
|
||||
},
|
||||
{
|
||||
// LimitA: 64
|
||||
// TailA: 65
|
||||
//
|
||||
// block [65, 128] are indexed
|
||||
limitA: 64,
|
||||
tailA: 65,
|
||||
|
||||
// LimitB: 1
|
||||
// TailB: 128
|
||||
//
|
||||
// block-128 is indexed
|
||||
limitB: 1,
|
||||
tailB: 128,
|
||||
|
||||
// LimitB: 64
|
||||
// TailB: 65
|
||||
//
|
||||
// block [65, 128] are indexed
|
||||
limitC: 64,
|
||||
tailC: 65,
|
||||
},
|
||||
{
|
||||
// LimitA: 127
|
||||
// TailA: 2
|
||||
//
|
||||
// block [2, 128] are indexed
|
||||
limitA: 127,
|
||||
tailA: 2,
|
||||
|
||||
// LimitB: 1
|
||||
// TailB: 128
|
||||
//
|
||||
// block-128 is indexed
|
||||
limitB: 1,
|
||||
tailB: 128,
|
||||
|
||||
// LimitB: 64
|
||||
// TailB: 65
|
||||
//
|
||||
// block [65, 128] are indexed
|
||||
limitC: 64,
|
||||
tailC: 65,
|
||||
},
|
||||
{
|
||||
// LimitA: 128
|
||||
// TailA: 1
|
||||
//
|
||||
// block [2, 128] are indexed
|
||||
limitA: 128,
|
||||
tailA: 1,
|
||||
|
||||
// LimitB: 1
|
||||
// TailB: 128
|
||||
//
|
||||
// block-128 is indexed
|
||||
limitB: 1,
|
||||
tailB: 128,
|
||||
|
||||
// LimitB: 64
|
||||
// TailB: 65
|
||||
//
|
||||
// block [65, 128] are indexed
|
||||
limitC: 64,
|
||||
tailC: 65,
|
||||
},
|
||||
{
|
||||
// LimitA: 129
|
||||
// TailA: 0
|
||||
//
|
||||
// block [0, 128] are indexed
|
||||
limitA: 129,
|
||||
tailA: 0,
|
||||
|
||||
// LimitB: 1
|
||||
// TailB: 128
|
||||
//
|
||||
// block-128 is indexed
|
||||
limitB: 1,
|
||||
tailB: 128,
|
||||
|
||||
// LimitB: 64
|
||||
// TailB: 65
|
||||
//
|
||||
// block [65, 128] are indexed
|
||||
limitC: 64,
|
||||
tailC: 65,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
frdir := t.TempDir()
|
||||
db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
|
||||
rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0))
|
||||
|
||||
// Index the initial blocks from ancient store
|
||||
chain, _ := NewBlockChain(db, nil, gspec, nil, engine, vm.Config{}, nil, &c.limitA)
|
||||
chain.indexBlocks(nil, 128, make(chan struct{}))
|
||||
verify(db, c.tailA)
|
||||
|
||||
chain.SetTxLookupLimit(c.limitB)
|
||||
chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}))
|
||||
verify(db, c.tailB)
|
||||
|
||||
chain.SetTxLookupLimit(c.limitC)
|
||||
chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}))
|
||||
verify(db, c.tailC)
|
||||
|
||||
// Recover all indexes
|
||||
chain.SetTxLookupLimit(0)
|
||||
chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}))
|
||||
verify(db, 0)
|
||||
|
||||
db.Close()
|
||||
os.RemoveAll(frdir)
|
||||
}
|
||||
}
|
||||
|
@ -259,8 +259,7 @@ func WriteLastPivotNumber(db ethdb.KeyValueWriter, pivot uint64) {
|
||||
}
|
||||
|
||||
// ReadTxIndexTail retrieves the number of oldest indexed block
|
||||
// whose transaction indices has been indexed. If the corresponding entry
|
||||
// is non-existent in database it means the indexing has been finished.
|
||||
// whose transaction indices has been indexed.
|
||||
func ReadTxIndexTail(db ethdb.KeyValueReader) *uint64 {
|
||||
data, _ := db.Get(txIndexTailKey)
|
||||
if len(data) != 8 {
|
||||
|
Loading…
Reference in New Issue
Block a user