core, core/rawdb, eth/sync: no tx indexing during snap sync (#28703)

This change simplifies the logic for indexing transactions and enhances the UX when transaction is not found by returning more information to users.

Transaction indexing is now considered as a part of the initial sync, and `eth.syncing` will thus be `true` if transaction indexing is not yet finished. API consumers can use the syncing status to determine if the node is ready to serve users.
This commit is contained in:
rjl493456442 2024-01-23 04:05:18 +08:00 committed by GitHub
parent f55a10b64d
commit 78a3c32ef4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 446 additions and 365 deletions

View File

@ -185,6 +185,24 @@ func DefaultCacheConfigWithScheme(scheme string) *CacheConfig {
return &config return &config
} }
// txLookup is wrapper over transaction lookup along with the corresponding
// transaction object.
type txLookup struct {
lookup *rawdb.LegacyTxLookupEntry
transaction *types.Transaction
}
// TxIndexProgress is the struct describing the progress for transaction indexing.
type TxIndexProgress struct {
Indexed uint64 // number of blocks whose transactions are indexed
Remaining uint64 // number of blocks whose transactions are not indexed yet
}
// Done returns an indicator if the transaction indexing is finished.
func (prog TxIndexProgress) Done() bool {
return prog.Remaining == 0
}
// BlockChain represents the canonical chain given a database with a genesis // BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations. // block. The Blockchain manages chain imports, reverts, chain reorganisations.
// //
@ -242,16 +260,19 @@ type BlockChain struct {
bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue] bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue]
receiptsCache *lru.Cache[common.Hash, []*types.Receipt] receiptsCache *lru.Cache[common.Hash, []*types.Receipt]
blockCache *lru.Cache[common.Hash, *types.Block] blockCache *lru.Cache[common.Hash, *types.Block]
txLookupCache *lru.Cache[common.Hash, *rawdb.LegacyTxLookupEntry] txLookupCache *lru.Cache[common.Hash, txLookup]
// future blocks are blocks added for later processing // future blocks are blocks added for later processing
futureBlocks *lru.Cache[common.Hash, *types.Block] futureBlocks *lru.Cache[common.Hash, *types.Block]
wg sync.WaitGroup // wg sync.WaitGroup
quit chan struct{} // shutdown signal, closed in Stop. quit chan struct{} // shutdown signal, closed in Stop.
stopping atomic.Bool // false if chain is running, true when stopped stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing procInterrupt atomic.Bool // interrupt signaler for block processing
txIndexRunning bool // flag if the background tx indexer is activated
txIndexProgCh chan chan TxIndexProgress // chan for querying the progress of transaction indexing
engine consensus.Engine engine consensus.Engine
validator Validator // Block and state validator interface validator Validator // Block and state validator interface
prefetcher Prefetcher prefetcher Prefetcher
@ -297,8 +318,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit), bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, *rawdb.LegacyTxLookupEntry](txLookupCacheLimit), txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks), futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
txIndexProgCh: make(chan chan TxIndexProgress),
engine: engine, engine: engine,
vmConfig: vmConfig, vmConfig: vmConfig,
} }
@ -466,6 +488,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Start tx indexer/unindexer if required. // Start tx indexer/unindexer if required.
if txLookupLimit != nil { if txLookupLimit != nil {
bc.txLookupLimit = *txLookupLimit bc.txLookupLimit = *txLookupLimit
bc.txIndexRunning = true
bc.wg.Add(1) bc.wg.Add(1)
go bc.maintainTxIndex() go bc.maintainTxIndex()
@ -1155,14 +1178,13 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Ensure genesis is in ancients. // Ensure genesis is in ancients.
if first.NumberU64() == 1 { if first.NumberU64() == 1 {
if frozen, _ := bc.db.Ancients(); frozen == 0 { if frozen, _ := bc.db.Ancients(); frozen == 0 {
b := bc.genesisBlock
td := bc.genesisBlock.Difficulty() td := bc.genesisBlock.Difficulty()
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, td) writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td)
size += writeSize
if err != nil { if err != nil {
log.Error("Error writing genesis to ancients", "err", err) log.Error("Error writing genesis to ancients", "err", err)
return 0, err return 0, err
} }
size += writeSize
log.Info("Wrote genesis to ancients") log.Info("Wrote genesis to ancients")
} }
} }
@ -1176,44 +1198,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Write all chain data to ancients. // Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64()) td := bc.GetTd(first.Hash(), first.NumberU64())
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td) writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
size += writeSize
if err != nil { if err != nil {
log.Error("Error importing chain data to ancients", "err", err) log.Error("Error importing chain data to ancients", "err", err)
return 0, err return 0, err
} }
size += writeSize
// Write tx indices if any condition is satisfied:
// * If user requires to reserve all tx indices(txlookuplimit=0)
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
// * If block number is large enough to be regarded as a recent block
// It means blocks below the ancientLimit-txlookupLimit won't be indexed.
//
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
// an external ancient database, during the setup, blockchain will start
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
// range. In this case, all tx indices of newly imported blocks should be
// generated.
batch := bc.db.NewBatch()
for i, block := range blockChain {
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
} else if rawdb.ReadTxIndexTail(bc.db) != nil {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
}
stats.processed++
if batch.ValueSize() > ethdb.IdealBatchSize || i == len(blockChain)-1 {
size += int64(batch.ValueSize())
if err = batch.Write(); err != nil {
snapBlock := bc.CurrentSnapBlock().Number.Uint64()
if _, err := bc.db.TruncateHead(snapBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
}
batch.Reset()
}
}
// Sync the ancient store explicitly to ensure all data has been flushed to disk. // Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil { if err := bc.db.Sync(); err != nil {
@ -1231,8 +1220,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
} }
// Delete block data from the main database. // Delete block data from the main database.
batch.Reset() var (
canonHashes := make(map[common.Hash]struct{}) batch = bc.db.NewBatch()
canonHashes = make(map[common.Hash]struct{})
)
for _, block := range blockChain { for _, block := range blockChain {
canonHashes[block.Hash()] = struct{}{} canonHashes[block.Hash()] = struct{}{}
if block.NumberU64() == 0 { if block.NumberU64() == 0 {
@ -1250,13 +1241,16 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
return 0, err return 0, err
} }
stats.processed += int32(len(blockChain))
return 0, nil return 0, nil
} }
// writeLive writes blockchain and corresponding receipt chain into active store. // writeLive writes blockchain and corresponding receipt chain into active store.
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
skipPresenceCheck := false var (
batch := bc.db.NewBatch() skipPresenceCheck = false
batch = bc.db.NewBatch()
)
for i, block := range blockChain { for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed // Short circuit insertion if shutting down or processing failed
if bc.insertStopped() { if bc.insertStopped() {
@ -1281,11 +1275,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Write all the data out into the database // Write all the data out into the database
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
rawdb.WriteTxLookupEntriesByBlock(batch, block) // Always write tx indices for live blocks, we assume they are needed
// Write everything belongs to the blocks into the database. So that // Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts, // we can ensure all components of body is completed(body, receipts)
// tx indexes) // except transaction indexes(will be created once sync is finished).
if batch.ValueSize() >= ethdb.IdealBatchSize { if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
return 0, err return 0, err
@ -1317,19 +1310,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return n, err return n, err
} }
} }
// Write the tx index tail (block number from where we index) before write any live blocks
if len(liveBlocks) > 0 && liveBlocks[0].NumberU64() == ancientLimit+1 {
// The tx index tail can only be one of the following two options:
// * 0: all ancient blocks have been indexed
// * ancient-limit: the indices of blocks before ancient-limit are ignored
if tail := rawdb.ReadTxIndexTail(bc.db); tail == nil {
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit {
rawdb.WriteTxIndexTail(bc.db, 0)
} else {
rawdb.WriteTxIndexTail(bc.db, ancientLimit-bc.txLookupLimit)
}
}
}
if len(liveBlocks) > 0 { if len(liveBlocks) > 0 {
if n, err := writeLive(liveBlocks, liveReceipts); err != nil { if n, err := writeLive(liveBlocks, liveReceipts); err != nil {
if err == errInsertionInterrupted { if err == errInsertionInterrupted {
@ -1338,13 +1318,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return n, err return n, err
} }
} }
var (
head := blockChain[len(blockChain)-1] head = blockChain[len(blockChain)-1]
context := []interface{}{ context = []interface{}{
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)), "count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
"number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)), "number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)),
"size", common.StorageSize(size), "size", common.StorageSize(size),
} }
)
if stats.ignored > 0 { if stats.ignored > 0 {
context = append(context, []interface{}{"ignored", stats.ignored}...) context = append(context, []interface{}{"ignored", stats.ignored}...)
} }
@ -1360,7 +1341,6 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
if bc.insertStopped() { if bc.insertStopped() {
return errInsertionInterrupted return errInsertionInterrupted
} }
batch := bc.db.NewBatch() batch := bc.db.NewBatch()
rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td) rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td)
rawdb.WriteBlock(batch, block) rawdb.WriteBlock(batch, block)
@ -2427,23 +2407,24 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) { func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) {
defer func() { close(done) }() defer func() { close(done) }()
// If head is 0, it means the chain is just initialized and no blocks are inserted, // If head is 0, it means the chain is just initialized and no blocks are
// so don't need to indexing anything. // inserted, so don't need to index anything.
if head == 0 { if head == 0 {
return return
} }
// The tail flag is not existent, it means the node is just initialized // The tail flag is not existent, it means the node is just initialized
// and all blocks(may from ancient store) are not indexed yet. // and all blocks in the chain (part of them may from ancient store) are
// not indexed yet, index the chain according to the configuration then.
if tail == nil { if tail == nil {
from := uint64(0) from := uint64(0)
if bc.txLookupLimit != 0 && head >= bc.txLookupLimit { if bc.txLookupLimit != 0 && head >= bc.txLookupLimit {
from = head - bc.txLookupLimit + 1 from = head - bc.txLookupLimit + 1
} }
rawdb.IndexTransactions(bc.db, from, head+1, bc.quit) rawdb.IndexTransactions(bc.db, from, head+1, bc.quit, true)
return return
} }
// The tail flag is existent, but the whole chain is required to be indexed. // The tail flag is existent (which means indexes in [tail, head] should be
// present), while the whole chain are requested for indexing.
if bc.txLookupLimit == 0 || head < bc.txLookupLimit { if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
if *tail > 0 { if *tail > 0 {
// It can happen when chain is rewound to a historical point which // It can happen when chain is rewound to a historical point which
@ -2453,17 +2434,58 @@ func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{})
if end > head+1 { if end > head+1 {
end = head + 1 end = head + 1
} }
rawdb.IndexTransactions(bc.db, 0, end, bc.quit) rawdb.IndexTransactions(bc.db, 0, end, bc.quit, true)
} }
return return
} }
// Update the transaction index to the new chain state // The tail flag is existent, adjust the index range according to configuration
// and latest head.
if head-bc.txLookupLimit+1 < *tail { if head-bc.txLookupLimit+1 < *tail {
// Reindex a part of missing indices and rewind index tail to HEAD-limit // Reindex a part of missing indices and rewind index tail to HEAD-limit
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit) rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit, true)
} else { } else {
// Unindex a part of stale indices and forward index tail to HEAD-limit // Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit) rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit, false)
}
}
// reportTxIndexProgress returns the tx indexing progress.
func (bc *BlockChain) reportTxIndexProgress(head uint64) TxIndexProgress {
var (
remaining uint64
tail = rawdb.ReadTxIndexTail(bc.db)
)
total := bc.txLookupLimit
if bc.txLookupLimit == 0 {
total = head + 1 // genesis included
}
var indexed uint64
if tail != nil {
indexed = head - *tail + 1
}
// The value of indexed might be larger than total if some blocks need
// to be unindexed, avoiding a negative remaining.
if indexed < total {
remaining = total - indexed
}
return TxIndexProgress{
Indexed: indexed,
Remaining: remaining,
}
}
// TxIndexProgress retrieves the tx indexing progress, or an error if the
// background tx indexer is not activated or already stopped.
func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
if !bc.txIndexRunning {
return TxIndexProgress{}, errors.New("tx indexer is not activated")
}
ch := make(chan TxIndexProgress, 1)
select {
case bc.txIndexProgCh <- ch:
return <-ch, nil
case <-bc.quit:
return TxIndexProgress{}, errors.New("blockchain is closed")
} }
} }
@ -2483,6 +2505,7 @@ func (bc *BlockChain) maintainTxIndex() {
// Listening to chain events and manipulate the transaction indexes. // Listening to chain events and manipulate the transaction indexes.
var ( var (
done chan struct{} // Non-nil if background unindexing or reindexing routine is active. done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
) )
sub := bc.SubscribeChainHeadEvent(headCh) sub := bc.SubscribeChainHeadEvent(headCh)
@ -2492,14 +2515,14 @@ func (bc *BlockChain) maintainTxIndex() {
defer sub.Unsubscribe() defer sub.Unsubscribe()
log.Info("Initialized transaction indexer", "limit", bc.TxLookupLimit()) log.Info("Initialized transaction indexer", "limit", bc.TxLookupLimit())
// Launch the initial processing if chain is not empty. This step is // Launch the initial processing if chain is not empty (head != genesis).
// useful in these scenarios that chain has no progress and indexer // This step is useful in these scenarios that chain has no progress and
// is never triggered. // indexer is never triggered.
if head := rawdb.ReadHeadBlock(bc.db); head != nil { if head := rawdb.ReadHeadBlock(bc.db); head != nil && head.Number().Uint64() != 0 {
done = make(chan struct{}) done = make(chan struct{})
lastHead = head.Number().Uint64()
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.NumberU64(), done) go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.NumberU64(), done)
} }
for { for {
select { select {
case head := <-headCh: case head := <-headCh:
@ -2507,8 +2530,11 @@ func (bc *BlockChain) maintainTxIndex() {
done = make(chan struct{}) done = make(chan struct{})
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done) go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
} }
lastHead = head.Block.NumberU64()
case <-done: case <-done:
done = nil done = nil
case ch := <-bc.txIndexProgCh:
ch <- bc.reportTxIndexProgress(lastHead)
case <-bc.quit: case <-bc.quit:
if done != nil { if done != nil {
log.Info("Waiting background transaction indexer to exit") log.Info("Waiting background transaction indexer to exit")

View File

@ -17,6 +17,7 @@
package core package core
import ( import (
"errors"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -254,20 +255,46 @@ func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, max
return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical) return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
} }
// GetTransactionLookup retrieves the lookup associate with the given transaction // GetTransactionLookup retrieves the lookup along with the transaction
// hash from the cache or database. // itself associate with the given transaction hash.
func (bc *BlockChain) GetTransactionLookup(hash common.Hash) *rawdb.LegacyTxLookupEntry { //
// An error will be returned if the transaction is not found, and background
// indexing for transactions is still in progress. The transaction might be
// reachable shortly once it's indexed.
//
// A null will be returned in the transaction is not found and background
// transaction indexing is already finished. The transaction is not existent
// from the node's perspective.
func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLookupEntry, *types.Transaction, error) {
// Short circuit if the txlookup already in the cache, retrieve otherwise // Short circuit if the txlookup already in the cache, retrieve otherwise
if lookup, exist := bc.txLookupCache.Get(hash); exist { if item, exist := bc.txLookupCache.Get(hash); exist {
return lookup return item.lookup, item.transaction, nil
} }
tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(bc.db, hash) tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(bc.db, hash)
if tx == nil { if tx == nil {
return nil progress, err := bc.TxIndexProgress()
if err != nil {
return nil, nil, nil
} }
lookup := &rawdb.LegacyTxLookupEntry{BlockHash: blockHash, BlockIndex: blockNumber, Index: txIndex} // The transaction indexing is not finished yet, returning an
bc.txLookupCache.Add(hash, lookup) // error to explicitly indicate it.
return lookup if !progress.Done() {
return nil, nil, errors.New("transaction indexing still in progress")
}
// The transaction is already indexed, the transaction is either
// not existent or not in the range of index, returning null.
return nil, nil, nil
}
lookup := &rawdb.LegacyTxLookupEntry{
BlockHash: blockHash,
BlockIndex: blockNumber,
Index: txIndex,
}
bc.txLookupCache.Add(hash, txLookup{
lookup: lookup,
transaction: tx,
})
return lookup, tx, nil
} }
// GetTd retrieves a block's total difficulty in the canonical chain from the // GetTd retrieves a block's total difficulty in the canonical chain from the

View File

@ -2822,91 +2822,6 @@ func TestTransactionIndices(t *testing.T) {
} }
} }
func TestSkipStaleTxIndicesInSnapSync(t *testing.T) {
testSkipStaleTxIndicesInSnapSync(t, rawdb.HashScheme)
testSkipStaleTxIndicesInSnapSync(t, rawdb.PathScheme)
}
func testSkipStaleTxIndicesInSnapSync(t *testing.T, scheme string) {
// Configure and generate a sample block chain
var (
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
funds = big.NewInt(100000000000000000)
gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}}
signer = types.LatestSigner(gspec.Config)
)
_, blocks, receipts := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 128, func(i int, block *BlockGen) {
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, block.header.BaseFee, nil), signer, key)
if err != nil {
panic(err)
}
block.AddTx(tx)
})
check := func(tail *uint64, chain *BlockChain) {
stored := rawdb.ReadTxIndexTail(chain.db)
if tail == nil && stored != nil {
t.Fatalf("Oldest indexded block mismatch, want nil, have %d", *stored)
}
if tail != nil && *stored != *tail {
t.Fatalf("Oldest indexded block mismatch, want %d, have %d", *tail, *stored)
}
if tail != nil {
for i := *tail; i <= chain.CurrentBlock().Number.Uint64(); i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
if index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash()); index == nil {
t.Fatalf("Miss transaction indice, number %d hash %s", i, tx.Hash().Hex())
}
}
}
for i := uint64(0); i < *tail; i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
if index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash()); index != nil {
t.Fatalf("Transaction indice should be deleted, number %d hash %s", i, tx.Hash().Hex())
}
}
}
}
}
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
defer ancientDb.Close()
// Import all blocks into ancient db, only HEAD-32 indices are kept.
l := uint64(32)
chain, err := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l)
if err != nil {
t.Fatalf("failed to create tester chain: %v", err)
}
defer chain.Stop()
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
if n, err := chain.InsertHeaderChain(headers); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
// The indices before ancient-N(32) should be ignored. After that all blocks should be indexed.
if n, err := chain.InsertReceiptChain(blocks, receipts, 64); err != nil {
t.Fatalf("block %d: failed to insert into chain: %v", n, err)
}
tail := uint64(32)
check(&tail, chain)
}
// Benchmarks large blocks with value transfers to non-existing accounts // Benchmarks large blocks with value transfers to non-existing accounts
func benchmarkLargeNumberOfValueToNonexisting(b *testing.B, numTxs, numBlocks int, recipientFn func(uint64) common.Address, dataFn func(uint64) []byte) { func benchmarkLargeNumberOfValueToNonexisting(b *testing.B, numTxs, numBlocks int, recipientFn func(uint64) common.Address, dataFn func(uint64) []byte) {
var ( var (
@ -4160,6 +4075,12 @@ func TestTxIndexer(t *testing.T) {
} }
verifyRange(db, *tail, 128, true) verifyRange(db, *tail, 128, true)
} }
verifyProgress := func(chain *BlockChain) {
prog := chain.reportTxIndexProgress(128)
if !prog.Done() {
t.Fatalf("Expect fully indexed")
}
}
var cases = []struct { var cases = []struct {
limitA uint64 limitA uint64
@ -4289,19 +4210,23 @@ func TestTxIndexer(t *testing.T) {
chain, _ := NewBlockChain(db, nil, gspec, nil, engine, vm.Config{}, nil, &c.limitA) chain, _ := NewBlockChain(db, nil, gspec, nil, engine, vm.Config{}, nil, &c.limitA)
chain.indexBlocks(nil, 128, make(chan struct{})) chain.indexBlocks(nil, 128, make(chan struct{}))
verify(db, c.tailA) verify(db, c.tailA)
verifyProgress(chain)
chain.SetTxLookupLimit(c.limitB) chain.SetTxLookupLimit(c.limitB)
chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{})) chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}))
verify(db, c.tailB) verify(db, c.tailB)
verifyProgress(chain)
chain.SetTxLookupLimit(c.limitC) chain.SetTxLookupLimit(c.limitC)
chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{})) chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}))
verify(db, c.tailC) verify(db, c.tailC)
verifyProgress(chain)
// Recover all indexes // Recover all indexes
chain.SetTxLookupLimit(0) chain.SetTxLookupLimit(0)
chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{})) chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}))
verify(db, 0) verify(db, 0)
verifyProgress(chain)
chain.Stop() chain.Stop()
db.Close() db.Close()

View File

@ -278,23 +278,6 @@ func WriteTxIndexTail(db ethdb.KeyValueWriter, number uint64) {
} }
} }
// ReadFastTxLookupLimit retrieves the tx lookup limit used in fast sync.
func ReadFastTxLookupLimit(db ethdb.KeyValueReader) *uint64 {
data, _ := db.Get(fastTxLookupLimitKey)
if len(data) != 8 {
return nil
}
number := binary.BigEndian.Uint64(data)
return &number
}
// WriteFastTxLookupLimit stores the txlookup limit used in fast sync into database.
func WriteFastTxLookupLimit(db ethdb.KeyValueWriter, number uint64) {
if err := db.Put(fastTxLookupLimitKey, encodeBlockNumber(number)); err != nil {
log.Crit("Failed to store transaction lookup limit for fast sync", "err", err)
}
}
// ReadHeaderRange returns the rlp-encoded headers, starting at 'number', and going // ReadHeaderRange returns the rlp-encoded headers, starting at 'number', and going
// backwards towards genesis. This method assumes that the caller already has // backwards towards genesis. This method assumes that the caller already has
// placed a cap on count, to prevent DoS issues. // placed a cap on count, to prevent DoS issues.

View File

@ -178,7 +178,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool
// //
// There is a passed channel, the whole procedure will be interrupted if any // There is a passed channel, the whole procedure will be interrupted if any
// signal received. // signal received.
func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) {
// short circuit for invalid range // short circuit for invalid range
if from >= to { if from >= to {
return return
@ -188,13 +188,13 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan
batch = db.NewBatch() batch = db.NewBatch()
start = time.Now() start = time.Now()
logged = start.Add(-7 * time.Second) logged = start.Add(-7 * time.Second)
// Since we iterate in reverse, we expect the first number to come // Since we iterate in reverse, we expect the first number to come
// in to be [to-1]. Therefore, setting lastNum to means that the // in to be [to-1]. Therefore, setting lastNum to means that the
// prqueue gap-evaluation will work correctly // queue gap-evaluation will work correctly
lastNum = to lastNum = to
queue = prque.New[int64, *blockTxHashes](nil) queue = prque.New[int64, *blockTxHashes](nil)
// for stats reporting blocks, txs = 0, 0 // for stats reporting
blocks, txs = 0, 0
) )
for chanDelivery := range hashesCh { for chanDelivery := range hashesCh {
// Push the delivery into the queue and process contiguous ranges. // Push the delivery into the queue and process contiguous ranges.
@ -240,11 +240,15 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan
log.Crit("Failed writing batch to db", "error", err) log.Crit("Failed writing batch to db", "error", err)
return return
} }
logger := log.Debug
if report {
logger = log.Info
}
select { select {
case <-interrupt: case <-interrupt:
log.Debug("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) logger("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
default: default:
log.Debug("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) logger("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
} }
} }
@ -257,20 +261,20 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan
// //
// There is a passed channel, the whole procedure will be interrupted if any // There is a passed channel, the whole procedure will be interrupted if any
// signal received. // signal received.
func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) { func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, report bool) {
indexTransactions(db, from, to, interrupt, nil) indexTransactions(db, from, to, interrupt, nil, report)
} }
// indexTransactionsForTesting is the internal debug version with an additional hook. // indexTransactionsForTesting is the internal debug version with an additional hook.
func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
indexTransactions(db, from, to, interrupt, hook) indexTransactions(db, from, to, interrupt, hook, false)
} }
// unindexTransactions removes txlookup indices of the specified block range. // unindexTransactions removes txlookup indices of the specified block range.
// //
// There is a passed channel, the whole procedure will be interrupted if any // There is a passed channel, the whole procedure will be interrupted if any
// signal received. // signal received.
func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) {
// short circuit for invalid range // short circuit for invalid range
if from >= to { if from >= to {
return return
@ -280,12 +284,12 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch
batch = db.NewBatch() batch = db.NewBatch()
start = time.Now() start = time.Now()
logged = start.Add(-7 * time.Second) logged = start.Add(-7 * time.Second)
// we expect the first number to come in to be [from]. Therefore, setting // we expect the first number to come in to be [from]. Therefore, setting
// nextNum to from means that the prqueue gap-evaluation will work correctly // nextNum to from means that the queue gap-evaluation will work correctly
nextNum = from nextNum = from
queue = prque.New[int64, *blockTxHashes](nil) queue = prque.New[int64, *blockTxHashes](nil)
// for stats reporting blocks, txs = 0, 0 // for stats reporting
blocks, txs = 0, 0
) )
// Otherwise spin up the concurrent iterator and unindexer // Otherwise spin up the concurrent iterator and unindexer
for delivery := range hashesCh { for delivery := range hashesCh {
@ -332,11 +336,15 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch
log.Crit("Failed writing batch to db", "error", err) log.Crit("Failed writing batch to db", "error", err)
return return
} }
logger := log.Debug
if report {
logger = log.Info
}
select { select {
case <-interrupt: case <-interrupt:
log.Debug("Transaction unindexing interrupted", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) logger("Transaction unindexing interrupted", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
default: default:
log.Debug("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) logger("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
} }
} }
@ -345,11 +353,11 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch
// //
// There is a passed channel, the whole procedure will be interrupted if any // There is a passed channel, the whole procedure will be interrupted if any
// signal received. // signal received.
func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) { func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, report bool) {
unindexTransactions(db, from, to, interrupt, nil) unindexTransactions(db, from, to, interrupt, nil, report)
} }
// unindexTransactionsForTesting is the internal debug version with an additional hook. // unindexTransactionsForTesting is the internal debug version with an additional hook.
func unindexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { func unindexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
unindexTransactions(db, from, to, interrupt, hook) unindexTransactions(db, from, to, interrupt, hook, false)
} }

View File

@ -162,18 +162,18 @@ func TestIndexTransactions(t *testing.T) {
t.Fatalf("Transaction tail mismatch") t.Fatalf("Transaction tail mismatch")
} }
} }
IndexTransactions(chainDb, 5, 11, nil) IndexTransactions(chainDb, 5, 11, nil, false)
verify(5, 11, true, 5) verify(5, 11, true, 5)
verify(0, 5, false, 5) verify(0, 5, false, 5)
IndexTransactions(chainDb, 0, 5, nil) IndexTransactions(chainDb, 0, 5, nil, false)
verify(0, 11, true, 0) verify(0, 11, true, 0)
UnindexTransactions(chainDb, 0, 5, nil) UnindexTransactions(chainDb, 0, 5, nil, false)
verify(5, 11, true, 5) verify(5, 11, true, 5)
verify(0, 5, false, 5) verify(0, 5, false, 5)
UnindexTransactions(chainDb, 5, 11, nil) UnindexTransactions(chainDb, 5, 11, nil, false)
verify(0, 11, false, 11) verify(0, 11, false, 11)
// Testing corner cases // Testing corner cases
@ -190,7 +190,7 @@ func TestIndexTransactions(t *testing.T) {
}) })
verify(9, 11, true, 9) verify(9, 11, true, 9)
verify(0, 9, false, 9) verify(0, 9, false, 9)
IndexTransactions(chainDb, 0, 9, nil) IndexTransactions(chainDb, 0, 9, nil, false)
signal = make(chan struct{}) signal = make(chan struct{})
var once2 sync.Once var once2 sync.Once

View File

@ -657,7 +657,6 @@ func ReadChainMetadata(db ethdb.KeyValueStore) [][]string {
{"snapshotRecoveryNumber", pp(ReadSnapshotRecoveryNumber(db))}, {"snapshotRecoveryNumber", pp(ReadSnapshotRecoveryNumber(db))},
{"snapshotRoot", fmt.Sprintf("%v", ReadSnapshotRoot(db))}, {"snapshotRoot", fmt.Sprintf("%v", ReadSnapshotRoot(db))},
{"txIndexTail", pp(ReadTxIndexTail(db))}, {"txIndexTail", pp(ReadTxIndexTail(db))},
{"fastTxLookupLimit", pp(ReadFastTxLookupLimit(db))},
} }
if b := ReadSkeletonSyncStatus(db); b != nil { if b := ReadSkeletonSyncStatus(db); b != nil {
data = append(data, []string{"SkeletonSyncStatus", string(b)}) data = append(data, []string{"SkeletonSyncStatus", string(b)})

View File

@ -80,6 +80,8 @@ var (
txIndexTailKey = []byte("TransactionIndexTail") txIndexTailKey = []byte("TransactionIndexTail")
// fastTxLookupLimitKey tracks the transaction lookup limit during fast sync. // fastTxLookupLimitKey tracks the transaction lookup limit during fast sync.
// This flag is deprecated, it's kept to avoid reporting errors when inspect
// database.
fastTxLookupLimitKey = []byte("FastTransactionLookupLimit") fastTxLookupLimitKey = []byte("FastTransactionLookupLimit")
// badBlockKey tracks the list of bad blocks seen by local // badBlockKey tracks the list of bad blocks seen by local

View File

@ -308,9 +308,25 @@ func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction
return b.eth.txPool.Get(hash) return b.eth.txPool.Get(hash)
} }
func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { // GetTransaction retrieves the lookup along with the transaction itself associate
tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.eth.ChainDb(), txHash) // with the given transaction hash.
return tx, blockHash, blockNumber, index, nil //
// An error will be returned if the transaction is not found, and background
// indexing for transactions is still in progress. The error is used to indicate the
// scenario explicitly that the transaction might be reachable shortly.
//
// A null will be returned in the transaction is not found and background transaction
// indexing is already finished. The transaction is not existent from the perspective
// of node.
func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) {
lookup, tx, err := b.eth.blockchain.GetTransactionLookup(txHash)
if err != nil {
return false, nil, common.Hash{}, 0, 0, err
}
if lookup == nil || tx == nil {
return false, nil, common.Hash{}, 0, 0, nil
}
return true, tx, lookup.BlockHash, lookup.BlockIndex, lookup.Index, nil
} }
func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
@ -338,7 +354,12 @@ func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.S
} }
func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress { func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress {
return b.eth.Downloader().Progress() prog := b.eth.Downloader().Progress()
if txProg, err := b.eth.blockchain.TxIndexProgress(); err == nil {
prog.TxIndexFinishedBlocks = txProg.Indexed
prog.TxIndexRemainingBlocks = txProg.Remaining
}
return prog
} }
func (b *EthAPIBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { func (b *EthAPIBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {

View File

@ -322,7 +322,7 @@ func (s *Ethereum) APIs() []rpc.API {
Service: NewMinerAPI(s), Service: NewMinerAPI(s),
}, { }, {
Namespace: "eth", Namespace: "eth",
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux), Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain, s.eventMux),
}, { }, {
Namespace: "admin", Namespace: "admin",
Service: NewAdminAPI(s), Service: NewAdminAPI(s),

View File

@ -19,16 +19,20 @@ package downloader
import ( import (
"context" "context"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
// DownloaderAPI provides an API which gives information about the current synchronisation status. // DownloaderAPI provides an API which gives information about the current
// It offers only methods that operates on data that can be available to anyone without security risks. // synchronisation status. It offers only methods that operates on data that
// can be available to anyone without security risks.
type DownloaderAPI struct { type DownloaderAPI struct {
d *Downloader d *Downloader
chain *core.BlockChain
mux *event.TypeMux mux *event.TypeMux
installSyncSubscription chan chan interface{} installSyncSubscription chan chan interface{}
uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
@ -38,31 +42,57 @@ type DownloaderAPI struct {
// listens for events from the downloader through the global event mux. In case it receives one of // listens for events from the downloader through the global event mux. In case it receives one of
// these events it broadcasts it to all syncing subscriptions that are installed through the // these events it broadcasts it to all syncing subscriptions that are installed through the
// installSyncSubscription channel. // installSyncSubscription channel.
func NewDownloaderAPI(d *Downloader, m *event.TypeMux) *DownloaderAPI { func NewDownloaderAPI(d *Downloader, chain *core.BlockChain, m *event.TypeMux) *DownloaderAPI {
api := &DownloaderAPI{ api := &DownloaderAPI{
d: d, d: d,
chain: chain,
mux: m, mux: m,
installSyncSubscription: make(chan chan interface{}), installSyncSubscription: make(chan chan interface{}),
uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest), uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
} }
go api.eventLoop() go api.eventLoop()
return api return api
} }
// eventLoop runs a loop until the event mux closes. It will install and uninstall new // eventLoop runs a loop until the event mux closes. It will install and uninstall
// sync subscriptions and broadcasts sync status updates to the installed sync subscriptions. // new sync subscriptions and broadcasts sync status updates to the installed sync
// subscriptions.
//
// The sync status pushed to subscriptions can be a stream like:
// >>> {Syncing: true, Progress: {...}}
// >>> {false}
//
// If the node is already synced up, then only a single event subscribers will
// receive is {false}.
func (api *DownloaderAPI) eventLoop() { func (api *DownloaderAPI) eventLoop() {
var ( var (
sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) sub = api.mux.Subscribe(StartEvent{})
syncSubscriptions = make(map[chan interface{}]struct{}) syncSubscriptions = make(map[chan interface{}]struct{})
checkInterval = time.Second * 60
checkTimer = time.NewTimer(checkInterval)
// status flags
started bool
done bool
getProgress = func() ethereum.SyncProgress {
prog := api.d.Progress()
if txProg, err := api.chain.TxIndexProgress(); err == nil {
prog.TxIndexFinishedBlocks = txProg.Indexed
prog.TxIndexRemainingBlocks = txProg.Remaining
}
return prog
}
) )
defer checkTimer.Stop()
for { for {
select { select {
case i := <-api.installSyncSubscription: case i := <-api.installSyncSubscription:
syncSubscriptions[i] = struct{}{} syncSubscriptions[i] = struct{}{}
if done {
i <- false
}
case u := <-api.uninstallSyncSubscription: case u := <-api.uninstallSyncSubscription:
delete(syncSubscriptions, u.c) delete(syncSubscriptions, u.c)
close(u.uninstalled) close(u.uninstalled)
@ -70,21 +100,31 @@ func (api *DownloaderAPI) eventLoop() {
if event == nil { if event == nil {
return return
} }
var notification interface{}
switch event.Data.(type) { switch event.Data.(type) {
case StartEvent: case StartEvent:
notification = &SyncingResult{ started = true
}
case <-checkTimer.C:
if !started {
checkTimer.Reset(checkInterval)
continue
}
prog := getProgress()
if !prog.Done() {
notification := &SyncingResult{
Syncing: true, Syncing: true,
Status: api.d.Progress(), Status: prog,
} }
case DoneEvent, FailedEvent:
notification = false
}
// broadcast
for c := range syncSubscriptions { for c := range syncSubscriptions {
c <- notification c <- notification
} }
checkTimer.Reset(checkInterval)
continue
}
for c := range syncSubscriptions {
c <- false
}
done = true
} }
} }
} }

View File

@ -228,24 +228,6 @@ func (cs *chainSyncer) startSync(op *chainSyncOp) {
// doSync synchronizes the local blockchain with a remote peer. // doSync synchronizes the local blockchain with a remote peer.
func (h *handler) doSync(op *chainSyncOp) error { func (h *handler) doSync(op *chainSyncOp) error {
if op.mode == downloader.SnapSync {
// Before launch the snap sync, we have to ensure user uses the same
// txlookup limit.
// The main concern here is: during the snap sync Geth won't index the
// block(generate tx indices) before the HEAD-limit. But if user changes
// the limit in the next snap sync(e.g. user kill Geth manually and
// restart) then it will be hard for Geth to figure out the oldest block
// has been indexed. So here for the user-experience wise, it's non-optimal
// that user can't change limit during the snap sync. If changed, Geth
// will just blindly use the original one.
limit := h.chain.TxLookupLimit()
if stored := rawdb.ReadFastTxLookupLimit(h.database); stored == nil {
rawdb.WriteFastTxLookupLimit(h.database, limit)
} else if *stored != limit {
h.chain.SetTxLookupLimit(*stored)
log.Warn("Update txLookup limit", "provided", limit, "updated", *stored)
}
}
// Run the sync cycle, and disable snap sync if we're past the pivot block // Run the sync cycle, and disable snap sync if we're past the pivot block
err := h.downloader.LegacySync(op.peer.ID(), op.head, op.td, h.chain.Config().TerminalTotalDifficulty, op.mode) err := h.downloader.LegacySync(op.peer.ID(), op.head, op.td, h.chain.Config().TerminalTotalDifficulty, op.mode)
if err != nil { if err != nil {

View File

@ -80,7 +80,7 @@ type Backend interface {
HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error)
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error)
GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error)
RPCGasCap() uint64 RPCGasCap() uint64
ChainConfig() *params.ChainConfig ChainConfig() *params.ChainConfig
Engine() consensus.Engine Engine() consensus.Engine
@ -826,12 +826,12 @@ func containsTx(block *types.Block, hash common.Hash) bool {
// TraceTransaction returns the structured logs created during the execution of EVM // TraceTransaction returns the structured logs created during the execution of EVM
// and returns them as a JSON object. // and returns them as a JSON object.
func (api *API) TraceTransaction(ctx context.Context, hash common.Hash, config *TraceConfig) (interface{}, error) { func (api *API) TraceTransaction(ctx context.Context, hash common.Hash, config *TraceConfig) (interface{}, error) {
tx, blockHash, blockNumber, index, err := api.backend.GetTransaction(ctx, hash) found, _, blockHash, blockNumber, index, err := api.backend.GetTransaction(ctx, hash)
if err != nil { if err != nil {
return nil, err return nil, ethapi.NewTxIndexingError()
} }
// Only mined txes are supported // Only mined txes are supported
if tx == nil { if !found {
return nil, errTxNotFound return nil, errTxNotFound
} }
// It shouldn't happen in practice. // It shouldn't happen in practice.

View File

@ -113,9 +113,9 @@ func (b *testBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber)
return b.chain.GetBlockByNumber(uint64(number)), nil return b.chain.GetBlockByNumber(uint64(number)), nil
} }
func (b *testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { func (b *testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) {
tx, hash, blockNumber, index := rawdb.ReadTransaction(b.chaindb, txHash) tx, hash, blockNumber, index := rawdb.ReadTransaction(b.chaindb, txHash)
return tx, hash, blockNumber, index, nil return tx != nil, tx, hash, blockNumber, index, nil
} }
func (b *testBackend) RPCGasCap() uint64 { func (b *testBackend) RPCGasCap() uint64 {

View File

@ -792,7 +792,7 @@ func (s *Service) reportStats(conn *connWrapper) error {
} }
sync := fullBackend.SyncProgress() sync := fullBackend.SyncProgress()
syncing = fullBackend.CurrentHeader().Number.Uint64() >= sync.HighestBlock syncing = !sync.Done()
price, _ := fullBackend.SuggestGasTipCap(context.Background()) price, _ := fullBackend.SuggestGasTipCap(context.Background())
gasprice = int(price.Uint64()) gasprice = int(price.Uint64())
@ -801,7 +801,7 @@ func (s *Service) reportStats(conn *connWrapper) error {
} }
} else { } else {
sync := s.backend.SyncProgress() sync := s.backend.SyncProgress()
syncing = s.backend.CurrentHeader().Number.Uint64() >= sync.HighestBlock syncing = !sync.Done()
} }
// Assemble the node stats and send it to the server // Assemble the node stats and send it to the server
log.Trace("Sending node details to ethstats") log.Trace("Sending node details to ethstats")

View File

@ -230,8 +230,8 @@ func (t *Transaction) resolve(ctx context.Context) (*types.Transaction, *Block)
return t.tx, t.block return t.tx, t.block
} }
// Try to return an already finalized transaction // Try to return an already finalized transaction
tx, blockHash, _, index, err := t.r.backend.GetTransaction(ctx, t.hash) found, tx, blockHash, _, index, _ := t.r.backend.GetTransaction(ctx, t.hash)
if err == nil && tx != nil { if found {
t.tx = tx t.tx = tx
blockNrOrHash := rpc.BlockNumberOrHashWithHash(blockHash, false) blockNrOrHash := rpc.BlockNumberOrHashWithHash(blockHash, false)
t.block = &Block{ t.block = &Block{
@ -1509,6 +1509,12 @@ func (s *SyncState) HealingTrienodes() hexutil.Uint64 {
func (s *SyncState) HealingBytecode() hexutil.Uint64 { func (s *SyncState) HealingBytecode() hexutil.Uint64 {
return hexutil.Uint64(s.progress.HealingBytecode) return hexutil.Uint64(s.progress.HealingBytecode)
} }
func (s *SyncState) TxIndexFinishedBlocks() hexutil.Uint64 {
return hexutil.Uint64(s.progress.TxIndexFinishedBlocks)
}
func (s *SyncState) TxIndexRemainingBlocks() hexutil.Uint64 {
return hexutil.Uint64(s.progress.TxIndexRemainingBlocks)
}
// Syncing returns false in case the node is currently not syncing with the network. It can be up-to-date or has not // Syncing returns false in case the node is currently not syncing with the network. It can be up-to-date or has not
// yet received the latest block headers from its pears. In case it is synchronizing: // yet received the latest block headers from its pears. In case it is synchronizing:
@ -1527,11 +1533,13 @@ func (s *SyncState) HealingBytecode() hexutil.Uint64 {
// - healedBytecodeBytes: number of bytecodes persisted to disk // - healedBytecodeBytes: number of bytecodes persisted to disk
// - healingTrienodes: number of state trie nodes pending // - healingTrienodes: number of state trie nodes pending
// - healingBytecode: number of bytecodes pending // - healingBytecode: number of bytecodes pending
// - txIndexFinishedBlocks: number of blocks whose transactions are indexed
// - txIndexRemainingBlocks: number of blocks whose transactions are not indexed yet
func (r *Resolver) Syncing() (*SyncState, error) { func (r *Resolver) Syncing() (*SyncState, error) {
progress := r.backend.SyncProgress() progress := r.backend.SyncProgress()
// Return not syncing if the synchronisation already completed // Return not syncing if the synchronisation already completed
if progress.CurrentBlock >= progress.HighestBlock { if progress.Done() {
return nil, nil return nil, nil
} }
// Otherwise gather the block sync stats // Otherwise gather the block sync stats

View File

@ -120,6 +120,18 @@ type SyncProgress struct {
HealingTrienodes uint64 // Number of state trie nodes pending HealingTrienodes uint64 // Number of state trie nodes pending
HealingBytecode uint64 // Number of bytecodes pending HealingBytecode uint64 // Number of bytecodes pending
// "transaction indexing" fields
TxIndexFinishedBlocks uint64 // Number of blocks whose transactions are already indexed
TxIndexRemainingBlocks uint64 // Number of blocks whose transactions are not indexed yet
}
// Done returns the indicator if the initial sync is finished or not.
func (prog SyncProgress) Done() bool {
if prog.CurrentBlock < prog.HighestBlock {
return false
}
return prog.TxIndexRemainingBlocks == 0
} }
// ChainSyncReader wraps access to the node's current sync status. If there's no // ChainSyncReader wraps access to the node's current sync status. If there's no

View File

@ -27,7 +27,6 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/accounts/scwallet" "github.com/ethereum/go-ethereum/accounts/scwallet"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -134,7 +133,7 @@ func (s *EthereumAPI) Syncing() (interface{}, error) {
progress := s.b.SyncProgress() progress := s.b.SyncProgress()
// Return not syncing if the synchronisation already completed // Return not syncing if the synchronisation already completed
if progress.CurrentBlock >= progress.HighestBlock { if progress.Done() {
return false, nil return false, nil
} }
// Otherwise gather the block sync stats // Otherwise gather the block sync stats
@ -154,6 +153,8 @@ func (s *EthereumAPI) Syncing() (interface{}, error) {
"healedBytecodeBytes": hexutil.Uint64(progress.HealedBytecodeBytes), "healedBytecodeBytes": hexutil.Uint64(progress.HealedBytecodeBytes),
"healingTrienodes": hexutil.Uint64(progress.HealingTrienodes), "healingTrienodes": hexutil.Uint64(progress.HealingTrienodes),
"healingBytecode": hexutil.Uint64(progress.HealingBytecode), "healingBytecode": hexutil.Uint64(progress.HealingBytecode),
"txIndexFinishedBlocks": hexutil.Uint64(progress.TxIndexFinishedBlocks),
"txIndexRemainingBlocks": hexutil.Uint64(progress.TxIndexRemainingBlocks),
}, nil }, nil
} }
@ -1133,37 +1134,6 @@ func DoCall(ctx context.Context, b Backend, args TransactionArgs, blockNrOrHash
return doCall(ctx, b, args, state, header, overrides, blockOverrides, timeout, globalGasCap) return doCall(ctx, b, args, state, header, overrides, blockOverrides, timeout, globalGasCap)
} }
func newRevertError(revert []byte) *revertError {
err := vm.ErrExecutionReverted
reason, errUnpack := abi.UnpackRevert(revert)
if errUnpack == nil {
err = fmt.Errorf("%w: %v", vm.ErrExecutionReverted, reason)
}
return &revertError{
error: err,
reason: hexutil.Encode(revert),
}
}
// revertError is an API error that encompasses an EVM revertal with JSON error
// code and a binary data blob.
type revertError struct {
error
reason string // revert reason hex encoded
}
// ErrorCode returns the JSON error code for a revertal.
// See: https://github.com/ethereum/wiki/wiki/JSON-RPC-Error-Codes-Improvement-Proposal
func (e *revertError) ErrorCode() int {
return 3
}
// ErrorData returns the hex encoded revert reason.
func (e *revertError) ErrorData() interface{} {
return e.reason
}
// Call executes the given transaction on the state for the given block number. // Call executes the given transaction on the state for the given block number.
// //
// Additionally, the caller can specify a batch of contract for fields overriding. // Additionally, the caller can specify a batch of contract for fields overriding.
@ -1652,50 +1622,48 @@ func (s *TransactionAPI) GetTransactionCount(ctx context.Context, address common
// GetTransactionByHash returns the transaction for the given hash // GetTransactionByHash returns the transaction for the given hash
func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) { func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) {
// Try to return an already finalized transaction // Try to return an already finalized transaction
tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash) found, tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash)
if err != nil { if !found {
return nil, err // No finalized transaction, try to retrieve it from the pool
if tx := s.b.GetPoolTransaction(hash); tx != nil {
return NewRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil
}
if err == nil {
return nil, nil
}
return nil, NewTxIndexingError()
} }
if tx != nil {
header, err := s.b.HeaderByHash(ctx, blockHash) header, err := s.b.HeaderByHash(ctx, blockHash)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newRPCTransaction(tx, blockHash, blockNumber, header.Time, index, header.BaseFee, s.b.ChainConfig()), nil return newRPCTransaction(tx, blockHash, blockNumber, header.Time, index, header.BaseFee, s.b.ChainConfig()), nil
} }
// No finalized transaction, try to retrieve it from the pool
if tx := s.b.GetPoolTransaction(hash); tx != nil {
return NewRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil
}
// Transaction unknown, return as such
return nil, nil
}
// GetRawTransactionByHash returns the bytes of the transaction for the given hash. // GetRawTransactionByHash returns the bytes of the transaction for the given hash.
func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) { func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) {
// Retrieve a finalized transaction, or a pooled otherwise // Retrieve a finalized transaction, or a pooled otherwise
tx, _, _, _, err := s.b.GetTransaction(ctx, hash) found, tx, _, _, _, err := s.b.GetTransaction(ctx, hash)
if err != nil { if !found {
return nil, err if tx = s.b.GetPoolTransaction(hash); tx != nil {
return tx.MarshalBinary()
} }
if tx == nil { if err == nil {
if tx = s.b.GetPoolTransaction(hash); tx == nil {
// Transaction not found anywhere, abort
return nil, nil return nil, nil
} }
return nil, NewTxIndexingError()
} }
// Serialize to RLP and return
return tx.MarshalBinary() return tx.MarshalBinary()
} }
// GetTransactionReceipt returns the transaction receipt for the given transaction hash. // GetTransactionReceipt returns the transaction receipt for the given transaction hash.
func (s *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { func (s *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) {
tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash) found, tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash)
if tx == nil || err != nil { if err != nil {
// When the transaction doesn't exist, the RPC method should return JSON null return nil, NewTxIndexingError() // transaction is not fully indexed
// as per specification. }
return nil, nil if !found {
return nil, nil // transaction is not existent or reachable
} }
header, err := s.b.HeaderByHash(ctx, blockHash) header, err := s.b.HeaderByHash(ctx, blockHash)
if err != nil { if err != nil {
@ -2085,15 +2053,15 @@ func (api *DebugAPI) GetRawReceipts(ctx context.Context, blockNrOrHash rpc.Block
// GetRawTransaction returns the bytes of the transaction for the given hash. // GetRawTransaction returns the bytes of the transaction for the given hash.
func (s *DebugAPI) GetRawTransaction(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) { func (s *DebugAPI) GetRawTransaction(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) {
// Retrieve a finalized transaction, or a pooled otherwise // Retrieve a finalized transaction, or a pooled otherwise
tx, _, _, _, err := s.b.GetTransaction(ctx, hash) found, tx, _, _, _, err := s.b.GetTransaction(ctx, hash)
if err != nil { if !found {
return nil, err if tx = s.b.GetPoolTransaction(hash); tx != nil {
return tx.MarshalBinary()
} }
if tx == nil { if err == nil {
if tx = s.b.GetPoolTransaction(hash); tx == nil {
// Transaction not found anywhere, abort
return nil, nil return nil, nil
} }
return nil, NewTxIndexingError()
} }
return tx.MarshalBinary() return tx.MarshalBinary()
} }

View File

@ -583,9 +583,9 @@ func (b testBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) even
func (b testBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { func (b testBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
panic("implement me") panic("implement me")
} }
func (b testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { func (b testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) {
tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.db, txHash) tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.db, txHash)
return tx, blockHash, blockNumber, index, nil return true, tx, blockHash, blockNumber, index, nil
} }
func (b testBackend) GetPoolTransactions() (types.Transactions, error) { panic("implement me") } func (b testBackend) GetPoolTransactions() (types.Transactions, error) { panic("implement me") }
func (b testBackend) GetPoolTransaction(txHash common.Hash) *types.Transaction { panic("implement me") } func (b testBackend) GetPoolTransaction(txHash common.Hash) *types.Transaction { panic("implement me") }

View File

@ -75,7 +75,7 @@ type Backend interface {
// Transaction pool API // Transaction pool API
SendTx(ctx context.Context, signedTx *types.Transaction) error SendTx(ctx context.Context, signedTx *types.Transaction) error
GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error)
GetPoolTransactions() (types.Transactions, error) GetPoolTransactions() (types.Transactions, error)
GetPoolTransaction(txHash common.Hash) *types.Transaction GetPoolTransaction(txHash common.Hash) *types.Transaction
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)

78
internal/ethapi/errors.go Normal file
View File

@ -0,0 +1,78 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package ethapi
import (
"fmt"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/vm"
)
// revertError is an API error that encompasses an EVM revert with JSON error
// code and a binary data blob.
type revertError struct {
error
reason string // revert reason hex encoded
}
// ErrorCode returns the JSON error code for a revert.
// See: https://github.com/ethereum/wiki/wiki/JSON-RPC-Error-Codes-Improvement-Proposal
func (e *revertError) ErrorCode() int {
return 3
}
// ErrorData returns the hex encoded revert reason.
func (e *revertError) ErrorData() interface{} {
return e.reason
}
// newRevertError creates a revertError instance with the provided revert data.
func newRevertError(revert []byte) *revertError {
err := vm.ErrExecutionReverted
reason, errUnpack := abi.UnpackRevert(revert)
if errUnpack == nil {
err = fmt.Errorf("%w: %v", vm.ErrExecutionReverted, reason)
}
return &revertError{
error: err,
reason: hexutil.Encode(revert),
}
}
// TxIndexingError is an API error that indicates the transaction indexing is not
// fully finished yet with JSON error code and a binary data blob.
type TxIndexingError struct{}
// NewTxIndexingError creates a TxIndexingError instance.
func NewTxIndexingError() *TxIndexingError { return &TxIndexingError{} }
// Error implement error interface, returning the error message.
func (e *TxIndexingError) Error() string {
return "transaction indexing is in progress"
}
// ErrorCode returns the JSON error code for a revert.
// See: https://github.com/ethereum/wiki/wiki/JSON-RPC-Error-Codes-Improvement-Proposal
func (e *TxIndexingError) ErrorCode() int {
return 3 // TODO tbd
}
// ErrorData returns the hex encoded revert reason.
func (e *TxIndexingError) ErrorData() interface{} { return "transaction indexing is in progress" }

View File

@ -379,8 +379,8 @@ func (b *backendMock) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) eve
return nil return nil
} }
func (b *backendMock) SendTx(ctx context.Context, signedTx *types.Transaction) error { return nil } func (b *backendMock) SendTx(ctx context.Context, signedTx *types.Transaction) error { return nil }
func (b *backendMock) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { func (b *backendMock) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) {
return nil, [32]byte{}, 0, 0, nil return false, nil, [32]byte{}, 0, 0, nil
} }
func (b *backendMock) GetPoolTransactions() (types.Transactions, error) { return nil, nil } func (b *backendMock) GetPoolTransactions() (types.Transactions, error) { return nil, nil }
func (b *backendMock) GetPoolTransaction(txHash common.Hash) *types.Transaction { return nil } func (b *backendMock) GetPoolTransaction(txHash common.Hash) *types.Transaction { return nil }

View File

@ -3961,6 +3961,8 @@ var outputSyncingFormatter = function(result) {
result.healedBytecodeBytes = utils.toDecimal(result.healedBytecodeBytes); result.healedBytecodeBytes = utils.toDecimal(result.healedBytecodeBytes);
result.healingTrienodes = utils.toDecimal(result.healingTrienodes); result.healingTrienodes = utils.toDecimal(result.healingTrienodes);
result.healingBytecode = utils.toDecimal(result.healingBytecode); result.healingBytecode = utils.toDecimal(result.healingBytecode);
result.txIndexFinishedBlocks = utils.toDecimal(result.txIndexFinishedBlocks);
result.txIndexRemainingBlocks = utils.toDecimal(result.txIndexRemainingBlocks);
return result; return result;
}; };