From 45352300592cf68feb64d67aefa607eec2c9a1de Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 11 May 2020 17:58:43 +0200 Subject: [PATCH] cmd, core, eth: background transaction indexing (#20302) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * cmd, core, eth: init tx lookup in background * core/rawdb: tiny log fixes to make it clearer what's happening * core, eth: fix rebase errors * core/rawdb: make reindexing less generic, but more optimal * rlp: implement rlp list iterator * core/rawdb: new implementation of tx indexing/unindex using generic tx iterator and hashing rlp-data * core/rawdb, cmd/utils: fix review concerns * cmd/utils: fix merge issue * core/rawdb: add some log formatting polishes Co-authored-by: rjl493456442 Co-authored-by: Péter Szilágyi --- accounts/abi/bind/backends/simulated.go | 2 +- cmd/geth/chaincmd.go | 12 +- cmd/geth/main.go | 1 + cmd/geth/retesteth.go | 2 +- cmd/geth/usage.go | 1 + cmd/utils/flags.go | 23 +- consensus/clique/clique_test.go | 6 +- consensus/clique/snapshot_test.go | 2 +- core/bench_test.go | 4 +- core/block_validator_test.go | 8 +- core/blockchain.go | 147 +++++++++++- core/blockchain_test.go | 269 ++++++++++++++++++--- core/chain_makers_test.go | 2 +- core/dao_test.go | 12 +- core/genesis_test.go | 2 +- core/rawdb/accessors_chain.go | 56 +++++ core/rawdb/accessors_indexes.go | 24 +- core/rawdb/chain_iterator.go | 305 ++++++++++++++++++++++++ core/rawdb/chain_iterator_test.go | 82 +++++++ core/rawdb/freezer_reinit.go | 127 ---------- core/rawdb/schema.go | 6 + core/rlp_test.go | 201 ++++++++++++++++ eth/backend.go | 2 +- eth/config.go | 2 + eth/gen_config.go | 6 + eth/handler.go | 2 + eth/handler_test.go | 6 +- eth/helper_test.go | 2 +- eth/protocol_test.go | 4 +- eth/sync.go | 19 ++ light/odr_test.go | 2 +- light/trie_test.go | 2 +- light/txpool_test.go | 2 +- miner/worker_test.go | 4 +- rlp/iterator.go | 60 +++++ rlp/iterator_test.go | 59 +++++ tests/block_test_util.go | 2 +- 37 files changed, 1268 insertions(+), 200 deletions(-) create mode 100644 core/rawdb/chain_iterator.go create mode 100644 core/rawdb/chain_iterator_test.go delete mode 100644 core/rawdb/freezer_reinit.go create mode 100644 core/rlp_test.go create mode 100644 rlp/iterator.go create mode 100644 rlp/iterator_test.go diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index cfbe2914d..6d4bfe1cb 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -76,7 +76,7 @@ type SimulatedBackend struct { func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.GenesisAlloc, gasLimit uint64) *SimulatedBackend { genesis := core.Genesis{Config: params.AllEthashProtocolChanges, GasLimit: gasLimit, Alloc: alloc} genesis.MustCommit(database) - blockchain, _ := core.NewBlockChain(database, nil, genesis.Config, ethash.NewFaker(), vm.Config{}, nil) + blockchain, _ := core.NewBlockChain(database, nil, genesis.Config, ethash.NewFaker(), vm.Config{}, nil, nil) backend := &SimulatedBackend{ database: database, diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index 51eb54a0a..b853c37cc 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -91,6 +91,7 @@ The dumpgenesis command dumps the genesis block configuration in JSON format to utils.MetricsInfluxDBUsernameFlag, utils.MetricsInfluxDBPasswordFlag, utils.MetricsInfluxDBTagsFlag, + utils.TxLookupLimitFlag, }, Category: "BLOCKCHAIN COMMANDS", Description: ` @@ -158,6 +159,7 @@ The export-preimages command export hash preimages to an RLP encoded stream`, utils.FakePoWFlag, utils.RopstenFlag, utils.RinkebyFlag, + utils.TxLookupLimitFlag, utils.GoerliFlag, utils.LegacyTestnetFlag, }, @@ -274,7 +276,7 @@ func importChain(ctx *cli.Context) error { stack := makeFullNode(ctx) defer stack.Close() - chain, db := utils.MakeChain(ctx, stack) + chain, db := utils.MakeChain(ctx, stack, false) defer db.Close() // Start periodically gathering memory profiles @@ -364,7 +366,7 @@ func exportChain(ctx *cli.Context) error { stack := makeFullNode(ctx) defer stack.Close() - chain, _ := utils.MakeChain(ctx, stack) + chain, _ := utils.MakeChain(ctx, stack, true) start := time.Now() var err error @@ -439,7 +441,7 @@ func copyDb(ctx *cli.Context) error { stack := makeFullNode(ctx) defer stack.Close() - chain, chainDb := utils.MakeChain(ctx, stack) + chain, chainDb := utils.MakeChain(ctx, stack, false) syncMode := *utils.GlobalTextMarshaler(ctx, utils.SyncModeFlag.Name).(*downloader.SyncMode) var syncBloom *trie.SyncBloom @@ -547,7 +549,7 @@ func dump(ctx *cli.Context) error { stack := makeFullNode(ctx) defer stack.Close() - chain, chainDb := utils.MakeChain(ctx, stack) + chain, chainDb := utils.MakeChain(ctx, stack, true) defer chainDb.Close() for _, arg := range ctx.Args() { var block *types.Block @@ -586,7 +588,7 @@ func inspect(ctx *cli.Context) error { node, _ := makeConfigNode(ctx) defer node.Close() - _, chainDb := utils.MakeChain(ctx, node) + _, chainDb := utils.MakeChain(ctx, node, true) defer chainDb.Close() return rawdb.InspectDatabase(chainDb) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index b70bcdff5..c5b3a957c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -92,6 +92,7 @@ var ( utils.ExitWhenSyncedFlag, utils.GCModeFlag, utils.SnapshotFlag, + utils.TxLookupLimitFlag, utils.LightServeFlag, utils.LegacyLightServFlag, utils.LightIngressFlag, diff --git a/cmd/geth/retesteth.go b/cmd/geth/retesteth.go index 77ca285e3..29590b63b 100644 --- a/cmd/geth/retesteth.go +++ b/cmd/geth/retesteth.go @@ -400,7 +400,7 @@ func (api *RetestethAPI) SetChainParams(ctx context.Context, chainParams ChainPa } engine := &NoRewardEngine{inner: inner, rewardsOn: chainParams.SealEngine != "NoReward"} - blockchain, err := core.NewBlockChain(ethDb, nil, chainConfig, engine, vm.Config{}, nil) + blockchain, err := core.NewBlockChain(ethDb, nil, chainConfig, engine, vm.Config{}, nil, nil) if err != nil { return false, err } diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index e1cd37a24..1ae9a7727 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -78,6 +78,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.SyncModeFlag, utils.ExitWhenSyncedFlag, utils.GCModeFlag, + utils.TxLookupLimitFlag, utils.EthStatsURLFlag, utils.IdentityFlag, utils.LightKDFFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 9f53f2419..6f6233e4d 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -229,6 +229,11 @@ var ( Name: "snapshot", Usage: `Enables snapshot-database mode -- experimental work in progress feature`, } + TxLookupLimitFlag = cli.Int64Flag{ + Name: "txlookuplimit", + Usage: "Number of recent blocks to maintain transactions index by-hash for (default = index all blocks)", + Value: 0, + } LightKDFFlag = cli.BoolFlag{ Name: "lightkdf", Usage: "Reduce key-derivation RAM & CPU usage at some expense of KDF strength", @@ -1469,7 +1474,11 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { CheckExclusive(ctx, DeveloperFlag, LegacyTestnetFlag, RopstenFlag, RinkebyFlag, GoerliFlag) CheckExclusive(ctx, LegacyLightServFlag, LightServeFlag, SyncModeFlag, "light") CheckExclusive(ctx, DeveloperFlag, ExternalSignerFlag) // Can't use both ephemeral unlocked and external signer - + CheckExclusive(ctx, GCModeFlag, "archive", TxLookupLimitFlag) + // todo(rjl493456442) make it available for les server + // Ancient tx indices pruning is not available for les server now + // since light client relies on the server for transaction status query. + CheckExclusive(ctx, LegacyLightServFlag, LightServeFlag, TxLookupLimitFlag) var ks *keystore.KeyStore if keystores := stack.AccountManager().Backends(keystore.KeyStoreType); len(keystores) > 0 { ks = keystores[0].(*keystore.KeyStore) @@ -1505,6 +1514,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { if ctx.GlobalIsSet(CacheNoPrefetchFlag.Name) { cfg.NoPrefetch = ctx.GlobalBool(CacheNoPrefetchFlag.Name) } + if ctx.GlobalIsSet(TxLookupLimitFlag.Name) { + cfg.TxLookupLimit = ctx.GlobalUint64(TxLookupLimitFlag.Name) + } if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheTrieFlag.Name) { cfg.TrieCleanCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheTrieFlag.Name) / 100 } @@ -1746,7 +1758,7 @@ func MakeGenesis(ctx *cli.Context) *core.Genesis { } // MakeChain creates a chain manager from set command line flags. -func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chainDb ethdb.Database) { +func MakeChain(ctx *cli.Context, stack *node.Node, readOnly bool) (chain *core.BlockChain, chainDb ethdb.Database) { var err error chainDb = MakeChainDatabase(ctx, stack) config, _, err := core.SetupGenesisBlock(chainDb, MakeGenesis(ctx)) @@ -1792,7 +1804,12 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai cache.TrieDirtyLimit = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheGCFlag.Name) / 100 } vmcfg := vm.Config{EnablePreimageRecording: ctx.GlobalBool(VMEnableDebugFlag.Name)} - chain, err = core.NewBlockChain(chainDb, cache, config, engine, vmcfg, nil) + var limit *uint64 + if ctx.GlobalIsSet(TxLookupLimitFlag.Name) && !readOnly { + l := ctx.GlobalUint64(TxLookupLimitFlag.Name) + limit = &l + } + chain, err = core.NewBlockChain(chainDb, cache, config, engine, vmcfg, nil, limit) if err != nil { Fatalf("Can't create BlockChain: %v", err) } diff --git a/consensus/clique/clique_test.go b/consensus/clique/clique_test.go index 710f44805..49313374f 100644 --- a/consensus/clique/clique_test.go +++ b/consensus/clique/clique_test.go @@ -54,7 +54,7 @@ func TestReimportMirroredState(t *testing.T) { genesis := genspec.MustCommit(db) // Generate a batch of blocks, each properly signed - chain, _ := core.NewBlockChain(db, nil, params.AllCliqueProtocolChanges, engine, vm.Config{}, nil) + chain, _ := core.NewBlockChain(db, nil, params.AllCliqueProtocolChanges, engine, vm.Config{}, nil, nil) defer chain.Stop() blocks, _ := core.GenerateChain(params.AllCliqueProtocolChanges, genesis, engine, db, 3, func(i int, block *core.BlockGen) { @@ -88,7 +88,7 @@ func TestReimportMirroredState(t *testing.T) { db = rawdb.NewMemoryDatabase() genspec.MustCommit(db) - chain, _ = core.NewBlockChain(db, nil, params.AllCliqueProtocolChanges, engine, vm.Config{}, nil) + chain, _ = core.NewBlockChain(db, nil, params.AllCliqueProtocolChanges, engine, vm.Config{}, nil, nil) defer chain.Stop() if _, err := chain.InsertChain(blocks[:2]); err != nil { @@ -101,7 +101,7 @@ func TestReimportMirroredState(t *testing.T) { // Simulate a crash by creating a new chain on top of the database, without // flushing the dirty states out. Insert the last block, trigerring a sidechain // reimport. - chain, _ = core.NewBlockChain(db, nil, params.AllCliqueProtocolChanges, engine, vm.Config{}, nil) + chain, _ = core.NewBlockChain(db, nil, params.AllCliqueProtocolChanges, engine, vm.Config{}, nil, nil) defer chain.Stop() if _, err := chain.InsertChain(blocks[2:]); err != nil { diff --git a/consensus/clique/snapshot_test.go b/consensus/clique/snapshot_test.go index fc08722ef..3890fc51d 100644 --- a/consensus/clique/snapshot_test.go +++ b/consensus/clique/snapshot_test.go @@ -448,7 +448,7 @@ func TestClique(t *testing.T) { batches[len(batches)-1] = append(batches[len(batches)-1], block) } // Pass all the headers through clique and ensure tallying succeeds - chain, err := core.NewBlockChain(db, nil, &config, engine, vm.Config{}, nil) + chain, err := core.NewBlockChain(db, nil, &config, engine, vm.Config{}, nil, nil) if err != nil { t.Errorf("test %d: failed to create test chain: %v", i, err) continue diff --git a/core/bench_test.go b/core/bench_test.go index d7a5e11c2..0f4cabd83 100644 --- a/core/bench_test.go +++ b/core/bench_test.go @@ -175,7 +175,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) { // Time the insertion of the new chain. // State and blocks are stored in the same DB. - chainman, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + chainman, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer chainman.Stop() b.ReportAllocs() b.ResetTimer() @@ -287,7 +287,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) { if err != nil { b.Fatalf("error opening database at %v: %v", dir, err) } - chain, err := NewBlockChain(db, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil) + chain, err := NewBlockChain(db, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, nil) if err != nil { b.Fatalf("error creating chain: %v", err) } diff --git a/core/block_validator_test.go b/core/block_validator_test.go index 06e2ba1a4..dfb37b88c 100644 --- a/core/block_validator_test.go +++ b/core/block_validator_test.go @@ -42,7 +42,7 @@ func TestHeaderVerification(t *testing.T) { headers[i] = block.Header() } // Run the header checker for blocks one-by-one, checking for both valid and invalid nonces - chain, _ := NewBlockChain(testdb, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil) + chain, _ := NewBlockChain(testdb, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, nil) defer chain.Stop() for i := 0; i < len(blocks); i++ { @@ -106,11 +106,11 @@ func testHeaderConcurrentVerification(t *testing.T, threads int) { var results <-chan error if valid { - chain, _ := NewBlockChain(testdb, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil) + chain, _ := NewBlockChain(testdb, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, nil) _, results = chain.engine.VerifyHeaders(chain, headers, seals) chain.Stop() } else { - chain, _ := NewBlockChain(testdb, nil, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), vm.Config{}, nil) + chain, _ := NewBlockChain(testdb, nil, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), vm.Config{}, nil, nil) _, results = chain.engine.VerifyHeaders(chain, headers, seals) chain.Stop() } @@ -173,7 +173,7 @@ func testHeaderConcurrentAbortion(t *testing.T, threads int) { defer runtime.GOMAXPROCS(old) // Start the verifications and immediately abort - chain, _ := NewBlockChain(testdb, nil, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), vm.Config{}, nil) + chain, _ := NewBlockChain(testdb, nil, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), vm.Config{}, nil, nil) defer chain.Stop() abort, results := chain.engine.VerifyHeaders(chain, headers, seals) diff --git a/core/blockchain.go b/core/blockchain.go index 0d1c27f95..910f7eee5 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -148,6 +148,13 @@ type BlockChain struct { triegc *prque.Prque // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping + // txLookupLimit is the maximum number of blocks from head whose tx indices + // are reserved: + // * 0: means no limit and regenerate any missing indexes + // * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes + // * nil: disable tx reindexer/deleter, but still index new blocks + txLookupLimit uint64 + hc *HeaderChain rmLogsFeed event.Feed chainFeed event.Feed @@ -191,7 +198,7 @@ type BlockChain struct { // NewBlockChain returns a fully initialised block chain using information // available in the database. It initialises the default Ethereum Validator and // Processor. -func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool) (*BlockChain, error) { +func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, txLookupLimit *uint64) (*BlockChain, error) { if cacheConfig == nil { cacheConfig = &CacheConfig{ TrieCleanLimit: 256, @@ -246,8 +253,16 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bc.currentFastBlock.Store(nilBlock) // Initialize the chain with ancient data if it isn't empty. + var txIndexBlock uint64 + if bc.empty() { rawdb.InitDatabaseFromFreezer(bc.db) + // If ancient database is not empty, reconstruct all missing + // indices in the background. + frozen, _ := bc.db.Ancients() + if frozen > 0 { + txIndexBlock = frozen + } } if err := bc.loadLastState(); err != nil { @@ -310,6 +325,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } // Take ownership of this particular state go bc.update() + if txLookupLimit != nil { + bc.txLookupLimit = *txLookupLimit + go bc.maintainTxIndex(txIndexBlock) + } return bc, nil } @@ -1165,8 +1184,23 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // Flush data into ancient database. size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64())) - rawdb.WriteTxLookupEntries(batch, block) + // Write tx indices if any condition is satisfied: + // * If user requires to reserve all tx indices(txlookuplimit=0) + // * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit) + // * If block number is large enough to be regarded as a recent block + // It means blocks below the ancientLimit-txlookupLimit won't be indexed. + // + // But if the `TxIndexTail` is not nil, e.g. Geth is initialized with + // an external ancient database, during the setup, blockchain will start + // a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients) + // range. In this case, all tx indices of newly imported blocks should be + // generated. + if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit { + rawdb.WriteTxLookupEntries(batch, block) + } else if rawdb.ReadTxIndexTail(bc.db) != nil { + rawdb.WriteTxLookupEntries(batch, block) + } stats.processed++ } // Flush all tx-lookup index data. @@ -1240,7 +1274,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // Write all the data out into the database rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) - rawdb.WriteTxLookupEntries(batch, block) + rawdb.WriteTxLookupEntries(batch, block) // Always write tx indices for live blocks, we assume they are needed // Write everything belongs to the blocks into the database. So that // we can ensure all components of body is completed(body, receipts, @@ -1266,7 +1300,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ updateHead(blockChain[len(blockChain)-1]) return 0, nil } - // Write downloaded chain data and corresponding receipt chain data. + // Write downloaded chain data and corresponding receipt chain data if len(ancientBlocks) > 0 { if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil { if err == errInsertionInterrupted { @@ -1275,6 +1309,19 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return n, err } } + // Write the tx index tail (block number from where we index) before write any live blocks + if len(liveBlocks) > 0 && liveBlocks[0].NumberU64() == ancientLimit+1 { + // The tx index tail can only be one of the following two options: + // * 0: all ancient blocks have been indexed + // * ancient-limit: the indices of blocks before ancient-limit are ignored + if tail := rawdb.ReadTxIndexTail(bc.db); tail == nil { + if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit { + rawdb.WriteTxIndexTail(bc.db, 0) + } else { + rawdb.WriteTxIndexTail(bc.db, ancientLimit-bc.txLookupLimit) + } + } + } if len(liveBlocks) > 0 { if n, err := writeLive(liveBlocks, liveReceipts); err != nil { if err == errInsertionInterrupted { @@ -1298,6 +1345,18 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return 0, nil } +// SetTxLookupLimit is responsible for updating the txlookup limit to the +// original one stored in db if the new mismatches with the old one. +func (bc *BlockChain) SetTxLookupLimit(limit uint64) { + bc.txLookupLimit = limit +} + +// TxLookupLimit retrieves the txlookup limit used by blockchain to prune +// stale transaction indices. +func (bc *BlockChain) TxLookupLimit() uint64 { + return bc.txLookupLimit +} + var lastWrite uint64 // writeBlockWithoutState writes only the block and its metadata to the database, @@ -2116,6 +2175,86 @@ func (bc *BlockChain) update() { } } +// maintainTxIndex is responsible for the construction and deletion of the +// transaction index. +// +// User can use flag `txlookuplimit` to specify a "recentness" block, below +// which ancient tx indices get deleted. If `txlookuplimit` is 0, it means +// all tx indices will be reserved. +// +// The user can adjust the txlookuplimit value for each launch after fast +// sync, Geth will automatically construct the missing indices and delete +// the extra indices. +func (bc *BlockChain) maintainTxIndex(ancients uint64) { + // Before starting the actual maintenance, we need to handle a special case, + // where user might init Geth with an external ancient database. If so, we + // need to reindex all necessary transactions before starting to process any + // pruning requests. + if ancients > 0 { + var from = uint64(0) + if bc.txLookupLimit != 0 && ancients > bc.txLookupLimit { + from = ancients - bc.txLookupLimit + } + rawdb.IndexTransactions(bc.db, from, ancients) + } + // indexBlocks reindexes or unindexes transactions depending on user configuration + indexBlocks := func(tail *uint64, head uint64, done chan struct{}) { + defer func() { done <- struct{}{} }() + + // If the user just upgraded Geth to a new version which supports transaction + // index pruning, write the new tail and remove anything older. + if tail == nil { + if bc.txLookupLimit == 0 || head < bc.txLookupLimit { + // Nothing to delete, write the tail and return + rawdb.WriteTxIndexTail(bc.db, 0) + } else { + // Prune all stale tx indices and record the tx index tail + rawdb.UnindexTransactions(bc.db, 0, head-bc.txLookupLimit+1) + } + return + } + // If a previous indexing existed, make sure that we fill in any missing entries + if bc.txLookupLimit == 0 || head < bc.txLookupLimit { + if *tail > 0 { + rawdb.IndexTransactions(bc.db, 0, *tail) + } + return + } + // Update the transaction index to the new chain state + if head-bc.txLookupLimit+1 < *tail { + // Reindex a part of missing indices and rewind index tail to HEAD-limit + rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail) + } else { + // Unindex a part of stale indices and forward index tail to HEAD-limit + rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1) + } + } + // Any reindexing done, start listening to chain events and moving the index window + var ( + done chan struct{} // Non-nil if background unindexing or reindexing routine is active. + headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed + ) + sub := bc.SubscribeChainHeadEvent(headCh) + if sub == nil { + return + } + defer sub.Unsubscribe() + + for { + select { + case head := <-headCh: + if done == nil { + done = make(chan struct{}) + go indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done) + } + case <-done: + done = nil + case <-bc.quit: + return + } + } +} + // BadBlocks returns a list of the last 'bad blocks' that the client has seen on the network func (bc *BlockChain) BadBlocks() []*types.Block { blocks := make([]*types.Block, 0, bc.badBlocks.Len()) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index c18bc7610..96b96425a 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -54,7 +54,7 @@ func newCanonical(engine consensus.Engine, n int, full bool) (ethdb.Database, *B ) // Initialize a fresh chain with only a genesis block - blockchain, _ := NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil) + blockchain, _ := NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) // Create and inject the requested chain if n == 0 { return db, blockchain, nil @@ -509,7 +509,7 @@ func testReorgBadHashes(t *testing.T, full bool) { blockchain.Stop() // Create a new BlockChain and check that it rolled back the state. - ncm, err := NewBlockChain(blockchain.db, nil, blockchain.chainConfig, ethash.NewFaker(), vm.Config{}, nil) + ncm, err := NewBlockChain(blockchain.db, nil, blockchain.chainConfig, ethash.NewFaker(), vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create new chain manager: %v", err) } @@ -621,7 +621,7 @@ func TestFastVsFullChains(t *testing.T) { // Import the chain as an archive node for the comparison baseline archiveDb := rawdb.NewMemoryDatabase() gspec.MustCommit(archiveDb) - archive, _ := NewBlockChain(archiveDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + archive, _ := NewBlockChain(archiveDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer archive.Stop() if n, err := archive.InsertChain(blocks); err != nil { @@ -630,7 +630,7 @@ func TestFastVsFullChains(t *testing.T) { // Fast import the chain as a non-archive node to test fastDb := rawdb.NewMemoryDatabase() gspec.MustCommit(fastDb) - fast, _ := NewBlockChain(fastDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + fast, _ := NewBlockChain(fastDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer fast.Stop() headers := make([]*types.Header, len(blocks)) @@ -654,7 +654,7 @@ func TestFastVsFullChains(t *testing.T) { t.Fatalf("failed to create temp freezer db: %v", err) } gspec.MustCommit(ancientDb) - ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer ancient.Stop() if n, err := ancient.InsertHeaderChain(headers, 1); err != nil { @@ -750,7 +750,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { // Import the chain as an archive node and ensure all pointers are updated archiveDb, delfn := makeDb() defer delfn() - archive, _ := NewBlockChain(archiveDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + archive, _ := NewBlockChain(archiveDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) if n, err := archive.InsertChain(blocks); err != nil { t.Fatalf("failed to process block %d: %v", n, err) } @@ -763,7 +763,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { // Import the chain as a non-archive node and ensure all pointers are updated fastDb, delfn := makeDb() defer delfn() - fast, _ := NewBlockChain(fastDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + fast, _ := NewBlockChain(fastDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer fast.Stop() headers := make([]*types.Header, len(blocks)) @@ -783,7 +783,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { // Import the chain as a ancient-first node and ensure all pointers are updated ancientDb, delfn := makeDb() defer delfn() - ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer ancient.Stop() if n, err := ancient.InsertHeaderChain(headers, 1); err != nil { @@ -802,7 +802,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { // Import the chain as a light node and ensure all pointers are updated lightDb, delfn := makeDb() defer delfn() - light, _ := NewBlockChain(lightDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + light, _ := NewBlockChain(lightDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) if n, err := light.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } @@ -871,7 +871,7 @@ func TestChainTxReorgs(t *testing.T) { } }) // Import the chain. This runs all block validation rules. - blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) if i, err := blockchain.InsertChain(chain); err != nil { t.Fatalf("failed to insert original chain[%d]: %v", i, err) } @@ -941,7 +941,7 @@ func TestLogReorgs(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainID) ) - blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer blockchain.Stop() rmLogsCh := make(chan RemovedLogsEvent) @@ -996,6 +996,7 @@ func TestLogRebirth(t *testing.T) { engine = ethash.NewFaker() blockchain, _ = NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}, nil) ) + defer blockchain.Stop() // The event channels. @@ -1058,6 +1059,7 @@ func TestSideLogRebirth(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainID) blockchain, _ = NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) ) + defer blockchain.Stop() newLogCh := make(chan []*types.Log, 10) @@ -1130,7 +1132,7 @@ func TestReorgSideEvent(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainID) ) - blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer blockchain.Stop() chain, _ := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 3, func(i int, gen *BlockGen) {}) @@ -1262,7 +1264,7 @@ func TestEIP155Transition(t *testing.T) { genesis = gspec.MustCommit(db) ) - blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer blockchain.Stop() blocks, _ := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 4, func(i int, block *BlockGen) { @@ -1370,7 +1372,7 @@ func TestEIP161AccountRemoval(t *testing.T) { } genesis = gspec.MustCommit(db) ) - blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer blockchain.Stop() blocks, _ := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 3, func(i int, block *BlockGen) { @@ -1445,7 +1447,7 @@ func TestBlockchainHeaderchainReorgConsistency(t *testing.T) { diskdb := rawdb.NewMemoryDatabase() new(Genesis).MustCommit(diskdb) - chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil) + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } @@ -1489,7 +1491,7 @@ func TestTrieForkGC(t *testing.T) { diskdb := rawdb.NewMemoryDatabase() new(Genesis).MustCommit(diskdb) - chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil) + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } @@ -1528,7 +1530,7 @@ func TestLargeReorgTrieGC(t *testing.T) { diskdb := rawdb.NewMemoryDatabase() new(Genesis).MustCommit(diskdb) - chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil) + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } @@ -1588,7 +1590,7 @@ func TestBlockchainRecovery(t *testing.T) { t.Fatalf("failed to create temp freezer db: %v", err) } gspec.MustCommit(ancientDb) - ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) headers := make([]*types.Header, len(blocks)) for i, block := range blocks { @@ -1607,7 +1609,7 @@ func TestBlockchainRecovery(t *testing.T) { rawdb.WriteHeadFastBlockHash(ancientDb, midBlock.Hash()) // Reopen broken blockchain again - ancient, _ = NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + ancient, _ = NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer ancient.Stop() if num := ancient.CurrentBlock().NumberU64(); num != 0 { t.Errorf("head block mismatch: have #%v, want #%v", num, 0) @@ -1644,7 +1646,7 @@ func TestIncompleteAncientReceiptChainInsertion(t *testing.T) { t.Fatalf("failed to create temp freezer db: %v", err) } gspec.MustCommit(ancientDb) - ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer ancient.Stop() headers := make([]*types.Header, len(blocks)) @@ -1701,7 +1703,7 @@ func TestLowDiffLongChain(t *testing.T) { diskdb := rawdb.NewMemoryDatabase() new(Genesis).MustCommit(diskdb) - chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil) + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } @@ -1748,7 +1750,7 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 2*TriesInMemory, nil) diskdb := rawdb.NewMemoryDatabase() new(Genesis).MustCommit(diskdb) - chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil) + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } @@ -1845,7 +1847,7 @@ func testInsertKnownChainData(t *testing.T, typ string) { new(Genesis).MustCommit(chaindb) defer os.RemoveAll(dir) - chain, err := NewBlockChain(chaindb, nil, params.TestChainConfig, engine, vm.Config{}, nil) + chain, err := NewBlockChain(chaindb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } @@ -1961,7 +1963,7 @@ func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error diskdb := rawdb.NewMemoryDatabase() new(Genesis).MustCommit(diskdb) - chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil) + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) if err != nil { return nil, nil, nil, fmt.Errorf("failed to create tester chain: %v", err) } @@ -2065,6 +2067,219 @@ func TestReorgToShorterRemovesCanonMappingHeaderChain(t *testing.T) { } } +func TestTransactionIndices(t *testing.T) { + // Configure and generate a sample block chain + var ( + gendb = rawdb.NewMemoryDatabase() + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000) + gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}} + genesis = gspec.MustCommit(gendb) + signer = types.NewEIP155Signer(gspec.Config.ChainID) + ) + height := uint64(128) + blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), func(i int, block *BlockGen) { + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, nil, nil), signer, key) + if err != nil { + panic(err) + } + block.AddTx(tx) + }) + blocks2, _ := GenerateChain(gspec.Config, blocks[len(blocks)-1], ethash.NewFaker(), gendb, 10, nil) + + check := func(tail *uint64, chain *BlockChain) { + stored := rawdb.ReadTxIndexTail(chain.db) + if tail == nil && stored != nil { + t.Fatalf("Oldest indexded block mismatch, want nil, have %d", *stored) + } + if tail != nil && *stored != *tail { + t.Fatalf("Oldest indexded block mismatch, want %d, have %d", *tail, *stored) + } + if tail != nil { + for i := *tail; i <= chain.CurrentBlock().NumberU64(); i++ { + block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i) + if block.Transactions().Len() == 0 { + continue + } + for _, tx := range block.Transactions() { + if index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash()); index == nil { + t.Fatalf("Miss transaction indice, number %d hash %s", i, tx.Hash().Hex()) + } + } + } + for i := uint64(0); i < *tail; i++ { + block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i) + if block.Transactions().Len() == 0 { + continue + } + for _, tx := range block.Transactions() { + if index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash()); index != nil { + t.Fatalf("Transaction indice should be deleted, number %d hash %s", i, tx.Hash().Hex()) + } + } + } + } + } + frdir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.Remove(frdir) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(ancientDb) + + // Import all blocks into ancient db + l := uint64(0) + chain, err := NewBlockChain(ancientDb, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, &l) + if err != nil { + t.Fatalf("failed to create tester chain: %v", err) + } + headers := make([]*types.Header, len(blocks)) + for i, block := range blocks { + headers[i] = block.Header() + } + if n, err := chain.InsertHeaderChain(headers, 0); err != nil { + t.Fatalf("failed to insert header %d: %v", n, err) + } + if n, err := chain.InsertReceiptChain(blocks, receipts, 128); err != nil { + t.Fatalf("block %d: failed to insert into chain: %v", n, err) + } + chain.Stop() + ancientDb.Close() + + // Init block chain with external ancients, check all needed indices has been indexed. + limit := []uint64{0, 32, 64, 128} + for _, l := range limit { + ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(ancientDb) + chain, err = NewBlockChain(ancientDb, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, &l) + if err != nil { + t.Fatalf("failed to create tester chain: %v", err) + } + time.Sleep(50 * time.Millisecond) // Wait for indices initialisation + var tail uint64 + if l != 0 { + tail = uint64(128) - l + 1 + } + check(&tail, chain) + chain.Stop() + ancientDb.Close() + } + + // Reconstruct a block chain which only reserves HEAD-64 tx indices + ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(ancientDb) + + limit = []uint64{0, 64 /* drop stale */, 32 /* shorten history */, 64 /* extend history */, 0 /* restore all */} + tails := []uint64{0, 67 /* 130 - 64 + 1 */, 100 /* 131 - 32 + 1 */, 69 /* 132 - 64 + 1 */, 0} + for i, l := range limit { + chain, err = NewBlockChain(ancientDb, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, &l) + if err != nil { + t.Fatalf("failed to create tester chain: %v", err) + } + chain.InsertChain(blocks2[i : i+1]) // Feed chain a higher block to trigger indices updater. + time.Sleep(50 * time.Millisecond) // Wait for indices initialisation + check(&tails[i], chain) + chain.Stop() + } +} + +func TestSkipStaleTxIndicesInFastSync(t *testing.T) { + // Configure and generate a sample block chain + var ( + gendb = rawdb.NewMemoryDatabase() + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000) + gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}} + genesis = gspec.MustCommit(gendb) + signer = types.NewEIP155Signer(gspec.Config.ChainID) + ) + height := uint64(128) + blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), func(i int, block *BlockGen) { + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, nil, nil), signer, key) + if err != nil { + panic(err) + } + block.AddTx(tx) + }) + + check := func(tail *uint64, chain *BlockChain) { + stored := rawdb.ReadTxIndexTail(chain.db) + if tail == nil && stored != nil { + t.Fatalf("Oldest indexded block mismatch, want nil, have %d", *stored) + } + if tail != nil && *stored != *tail { + t.Fatalf("Oldest indexded block mismatch, want %d, have %d", *tail, *stored) + } + if tail != nil { + for i := *tail; i <= chain.CurrentBlock().NumberU64(); i++ { + block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i) + if block.Transactions().Len() == 0 { + continue + } + for _, tx := range block.Transactions() { + if index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash()); index == nil { + t.Fatalf("Miss transaction indice, number %d hash %s", i, tx.Hash().Hex()) + } + } + } + for i := uint64(0); i < *tail; i++ { + block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i) + if block.Transactions().Len() == 0 { + continue + } + for _, tx := range block.Transactions() { + if index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash()); index != nil { + t.Fatalf("Transaction indice should be deleted, number %d hash %s", i, tx.Hash().Hex()) + } + } + } + } + } + + frdir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.Remove(frdir) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(ancientDb) + + // Import all blocks into ancient db, only HEAD-32 indices are kept. + l := uint64(32) + chain, err := NewBlockChain(ancientDb, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, &l) + if err != nil { + t.Fatalf("failed to create tester chain: %v", err) + } + headers := make([]*types.Header, len(blocks)) + for i, block := range blocks { + headers[i] = block.Header() + } + if n, err := chain.InsertHeaderChain(headers, 0); err != nil { + t.Fatalf("failed to insert header %d: %v", n, err) + } + // The indices before ancient-N(32) should be ignored. After that all blocks should be indexed. + if n, err := chain.InsertReceiptChain(blocks, receipts, 64); err != nil { + t.Fatalf("block %d: failed to insert into chain: %v", n, err) + } + tail := uint64(32) + check(&tail, chain) +} + // Benchmarks large blocks with value transfers to non-existing accounts func benchmarkLargeNumberOfValueToNonexisting(b *testing.B, numTxs, numBlocks int, recipientFn func(uint64) common.Address, dataFn func(uint64) []byte) { var ( @@ -2110,7 +2325,7 @@ func benchmarkLargeNumberOfValueToNonexisting(b *testing.B, numTxs, numBlocks in diskdb := rawdb.NewMemoryDatabase() gspec.MustCommit(diskdb) - chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil) + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) if err != nil { b.Fatalf("failed to create tester chain: %v", err) } @@ -2192,7 +2407,7 @@ func TestSideImportPrunedBlocks(t *testing.T) { blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 2*TriesInMemory, nil) diskdb := rawdb.NewMemoryDatabase() new(Genesis).MustCommit(diskdb) - chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil) + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } @@ -2286,7 +2501,7 @@ func TestDeleteCreateRevert(t *testing.T) { diskdb := rawdb.NewMemoryDatabase() gspec.MustCommit(diskdb) - chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil) + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go index 32e3888d5..85a029f7c 100644 --- a/core/chain_makers_test.go +++ b/core/chain_makers_test.go @@ -79,7 +79,7 @@ func ExampleGenerateChain() { }) // Import the chain. This runs all block validation rules. - blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) defer blockchain.Stop() if i, err := blockchain.InsertChain(chain); err != nil { diff --git a/core/dao_test.go b/core/dao_test.go index 4e8dba9e8..89e1d83d7 100644 --- a/core/dao_test.go +++ b/core/dao_test.go @@ -45,7 +45,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { proConf.DAOForkBlock = forkBlock proConf.DAOForkSupport = true - proBc, _ := NewBlockChain(proDb, nil, &proConf, ethash.NewFaker(), vm.Config{}, nil) + proBc, _ := NewBlockChain(proDb, nil, &proConf, ethash.NewFaker(), vm.Config{}, nil, nil) defer proBc.Stop() conDb := rawdb.NewMemoryDatabase() @@ -55,7 +55,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { conConf.DAOForkBlock = forkBlock conConf.DAOForkSupport = false - conBc, _ := NewBlockChain(conDb, nil, &conConf, ethash.NewFaker(), vm.Config{}, nil) + conBc, _ := NewBlockChain(conDb, nil, &conConf, ethash.NewFaker(), vm.Config{}, nil, nil) defer conBc.Stop() if _, err := proBc.InsertChain(prefix); err != nil { @@ -69,7 +69,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Create a pro-fork block, and try to feed into the no-fork chain db = rawdb.NewMemoryDatabase() gspec.MustCommit(db) - bc, _ := NewBlockChain(db, nil, &conConf, ethash.NewFaker(), vm.Config{}, nil) + bc, _ := NewBlockChain(db, nil, &conConf, ethash.NewFaker(), vm.Config{}, nil, nil) defer bc.Stop() blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64())) @@ -94,7 +94,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Create a no-fork block, and try to feed into the pro-fork chain db = rawdb.NewMemoryDatabase() gspec.MustCommit(db) - bc, _ = NewBlockChain(db, nil, &proConf, ethash.NewFaker(), vm.Config{}, nil) + bc, _ = NewBlockChain(db, nil, &proConf, ethash.NewFaker(), vm.Config{}, nil, nil) defer bc.Stop() blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64())) @@ -120,7 +120,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Verify that contra-forkers accept pro-fork extra-datas after forking finishes db = rawdb.NewMemoryDatabase() gspec.MustCommit(db) - bc, _ := NewBlockChain(db, nil, &conConf, ethash.NewFaker(), vm.Config{}, nil) + bc, _ := NewBlockChain(db, nil, &conConf, ethash.NewFaker(), vm.Config{}, nil, nil) defer bc.Stop() blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64())) @@ -140,7 +140,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Verify that pro-forkers accept contra-fork extra-datas after forking finishes db = rawdb.NewMemoryDatabase() gspec.MustCommit(db) - bc, _ = NewBlockChain(db, nil, &proConf, ethash.NewFaker(), vm.Config{}, nil) + bc, _ = NewBlockChain(db, nil, &proConf, ethash.NewFaker(), vm.Config{}, nil, nil) defer bc.Stop() blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64())) diff --git a/core/genesis_test.go b/core/genesis_test.go index 06a169868..3470d0aa0 100644 --- a/core/genesis_test.go +++ b/core/genesis_test.go @@ -120,7 +120,7 @@ func TestSetupGenesis(t *testing.T) { // Advance to block #4, past the homestead transition block of customg. genesis := oldcustomg.MustCommit(db) - bc, _ := NewBlockChain(db, nil, oldcustomg.Config, ethash.NewFullFaker(), vm.Config{}, nil) + bc, _ := NewBlockChain(db, nil, oldcustomg.Config, ethash.NewFullFaker(), vm.Config{}, nil, nil) defer bc.Stop() blocks, _ := GenerateChain(oldcustomg.Config, genesis, ethash.NewFaker(), db, 4, nil) diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 9fa327062..2290e87d5 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -172,6 +172,43 @@ func WriteFastTrieProgress(db ethdb.KeyValueWriter, count uint64) { } } +// ReadTxIndexTail retrieves the number of oldest indexed block +// whose transaction indices has been indexed. If the corresponding entry +// is non-existent in database it means the indexing has been finished. +func ReadTxIndexTail(db ethdb.KeyValueReader) *uint64 { + data, _ := db.Get(txIndexTailKey) + if len(data) != 8 { + return nil + } + number := binary.BigEndian.Uint64(data) + return &number +} + +// WriteTxIndexTail stores the number of oldest indexed block +// into database. +func WriteTxIndexTail(db ethdb.KeyValueWriter, number uint64) { + if err := db.Put(txIndexTailKey, encodeBlockNumber(number)); err != nil { + log.Crit("Failed to store the transaction index tail", "err", err) + } +} + +// ReadFastTxLookupLimit retrieves the tx lookup limit used in fast sync. +func ReadFastTxLookupLimit(db ethdb.KeyValueReader) *uint64 { + data, _ := db.Get(fastTxLookupLimitKey) + if len(data) != 8 { + return nil + } + number := binary.BigEndian.Uint64(data) + return &number +} + +// WriteFastTxLookupLimit stores the txlookup limit used in fast sync into database. +func WriteFastTxLookupLimit(db ethdb.KeyValueWriter, number uint64) { + if err := db.Put(fastTxLookupLimitKey, encodeBlockNumber(number)); err != nil { + log.Crit("Failed to store transaction lookup limit for fast sync", "err", err) + } +} + // ReadHeaderRLP retrieves a block header in its raw RLP database encoding. func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { // First try to look up the data in ancient database. Extra hash @@ -290,6 +327,25 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue return nil // Can't find the data anywhere. } +// ReadCanonicalBodyRLP retrieves the block body (transactions and uncles) for the canonical +// block at number, in RLP encoding. +func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue { + // If it's an ancient one, we don't need the canonical hash + data, _ := db.Ancient(freezerBodiesTable, number) + if len(data) == 0 { + // Need to get the hash + data, _ = db.Get(blockBodyKey(number, ReadCanonicalHash(db, number))) + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + if len(data) == 0 { + data, _ = db.Ancient(freezerBodiesTable, number) + } + } + return data +} + // WriteBodyRLP stores an RLP encoded block body into the database. func WriteBodyRLP(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rlp rlp.RawValue) { if err := db.Put(blockBodyKey(number, hash), rlp); err != nil { diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index 38f8fe10e..c7f3df2ad 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -63,9 +63,31 @@ func WriteTxLookupEntries(db ethdb.KeyValueWriter, block *types.Block) { } } +// WriteTxLookupEntriesByHash is identical to WriteTxLookupEntries, but does not +// require a full types.Block as input. +func WriteTxLookupEntriesByHash(db ethdb.KeyValueWriter, number uint64, hashes []common.Hash) { + numberBytes := new(big.Int).SetUint64(number).Bytes() + for _, hash := range hashes { + if err := db.Put(txLookupKey(hash), numberBytes); err != nil { + log.Crit("Failed to store transaction lookup entry", "err", err) + } + } +} + // DeleteTxLookupEntry removes all transaction data associated with a hash. func DeleteTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash) { - db.Delete(txLookupKey(hash)) + if err := db.Delete(txLookupKey(hash)); err != nil { + log.Crit("Failed to delete transaction lookup entry", "err", err) + } +} + +// DeleteTxLookupEntries removes all transaction lookups for a given block. +func DeleteTxLookupEntriesByHash(db ethdb.KeyValueWriter, hashes []common.Hash) { + for _, hash := range hashes { + if err := db.Delete(txLookupKey(hash)); err != nil { + log.Crit("Failed to delete transaction lookup entry", "err", err) + } + } } // ReadTransaction retrieves a specific transaction from the database, along with diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go new file mode 100644 index 000000000..70513eddd --- /dev/null +++ b/core/rawdb/chain_iterator.go @@ -0,0 +1,305 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "math" + "runtime" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "golang.org/x/crypto/sha3" +) + +// InitDatabaseFromFreezer reinitializes an empty database from a previous batch +// of frozen ancient blocks. The method iterates over all the frozen blocks and +// injects into the database the block hash->number mappings. +func InitDatabaseFromFreezer(db ethdb.Database) { + // If we can't access the freezer or it's empty, abort + frozen, err := db.Ancients() + if err != nil || frozen == 0 { + return + } + var ( + batch = db.NewBatch() + start = time.Now() + logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log + hash common.Hash + ) + for i := uint64(0); i < frozen; i++ { + // Since the freezer has all data in sequential order on a file, + // it would be 'neat' to read more data in one go, and let the + // freezerdb return N items (e.g up to 1000 items per go) + // That would require an API change in Ancients though + if h, err := db.Ancient(freezerHashTable, i); err != nil { + log.Crit("Failed to init database from freezer", "err", err) + } else { + hash = common.BytesToHash(h) + } + WriteHeaderNumber(batch, hash, i) + // If enough data was accumulated in memory or we're at the last block, dump to disk + if batch.ValueSize() > ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + log.Crit("Failed to write data to db", "err", err) + } + batch.Reset() + } + // If we've spent too much time already, notify the user of what we're doing + if time.Since(logged) > 8*time.Second { + log.Info("Initializing database from freezer", "total", frozen, "number", i, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + if err := batch.Write(); err != nil { + log.Crit("Failed to write data to db", "err", err) + } + batch.Reset() + + WriteHeadHeaderHash(db, hash) + WriteHeadFastBlockHash(db, hash) + log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start))) +} + +type blockTxHashes struct { + number uint64 + hashes []common.Hash +} + +// iterateTransactions iterates over all transactions in the (canon) block +// number(s) given, and yields the hashes on a channel +func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool) (chan *blockTxHashes, chan struct{}) { + // One thread sequentially reads data from db + type numberRlp struct { + number uint64 + rlp rlp.RawValue + } + if to == from { + return nil, nil + } + threads := to - from + if cpus := runtime.NumCPU(); threads > uint64(cpus) { + threads = uint64(cpus) + } + var ( + rlpCh = make(chan *numberRlp, threads*2) // we send raw rlp over this channel + hashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh + abortCh = make(chan struct{}) + ) + // lookup runs in one instance + lookup := func() { + n, end := from, to + if reverse { + n, end = to-1, from-1 + } + defer close(rlpCh) + for n != end { + data := ReadCanonicalBodyRLP(db, n) + // Feed the block to the aggregator, or abort on interrupt + select { + case rlpCh <- &numberRlp{n, data}: + case <-abortCh: + return + } + if reverse { + n-- + } else { + n++ + } + } + } + // process runs in parallell + nThreadsAlive := int32(threads) + process := func() { + defer func() { + // Last processor closes the result channel + if atomic.AddInt32(&nThreadsAlive, -1) == 0 { + close(hashesCh) + } + }() + + var hasher = sha3.NewLegacyKeccak256() + for data := range rlpCh { + it, err := rlp.NewListIterator(data.rlp) + if err != nil { + log.Warn("tx iteration error", "error", err) + return + } + it.Next() + txs := it.Value() + txIt, err := rlp.NewListIterator(txs) + if err != nil { + log.Warn("tx iteration error", "error", err) + return + } + var hashes []common.Hash + for txIt.Next() { + if err := txIt.Err(); err != nil { + log.Warn("tx iteration error", "error", err) + return + } + var txHash common.Hash + hasher.Reset() + hasher.Write(txIt.Value()) + hasher.Sum(txHash[:0]) + hashes = append(hashes, txHash) + } + result := &blockTxHashes{ + hashes: hashes, + number: data.number, + } + // Feed the block to the aggregator, or abort on interrupt + select { + case hashesCh <- result: + case <-abortCh: + return + } + } + } + go lookup() // start the sequential db accessor + for i := 0; i < int(threads); i++ { + go process() + } + return hashesCh, abortCh +} + +// IndexTransactions creates txlookup indices of the specified block range. +// +// This function iterates canonical chain in reverse order, it has one main advantage: +// We can write tx index tail flag periodically even without the whole indexing +// procedure is finished. So that we can resume indexing procedure next time quickly. +func IndexTransactions(db ethdb.Database, from uint64, to uint64) { + // short circuit for invalid range + if from >= to { + return + } + var ( + hashesCh, abortCh = iterateTransactions(db, from, to, true) + batch = db.NewBatch() + start = time.Now() + logged = start.Add(-7 * time.Second) + // Since we iterate in reverse, we expect the first number to come + // in to be [to-1]. Therefore, setting lastNum to means that the + // prqueue gap-evaluation will work correctly + lastNum = to + queue = prque.New(nil) + // for stats reporting + blocks, txs = 0, 0 + ) + defer close(abortCh) + + for chanDelivery := range hashesCh { + // Push the delivery into the queue and process contiguous ranges. + // Since we iterate in reverse, so lower numbers have lower prio, and + // we can use the number directly as prio marker + queue.Push(chanDelivery, int64(chanDelivery.number)) + for !queue.Empty() { + // If the next available item is gapped, return + if _, priority := queue.Peek(); priority != int64(lastNum-1) { + break + } + // Next block available, pop it off and index it + delivery := queue.PopItem().(*blockTxHashes) + lastNum = delivery.number + WriteTxLookupEntriesByHash(batch, delivery.number, delivery.hashes) + blocks++ + txs += len(delivery.hashes) + // If enough data was accumulated in memory or we're at the last block, dump to disk + if batch.ValueSize() > ethdb.IdealBatchSize { + // Also write the tail there + WriteTxIndexTail(batch, lastNum) + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + batch.Reset() + } + // If we've spent too much time already, notify the user of what we're doing + if time.Since(logged) > 8*time.Second { + log.Info("Indexing transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + } + if lastNum < to { + WriteTxIndexTail(batch, lastNum) + // No need to write the batch if we never entered the loop above... + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + } + log.Info("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) +} + +// UnindexTransactions removes txlookup indices of the specified block range. +func UnindexTransactions(db ethdb.Database, from uint64, to uint64) { + // short circuit for invalid range + if from >= to { + return + } + // Write flag first and then unindex the transaction indices. Some indices + // will be left in the database if crash happens but it's fine. + WriteTxIndexTail(db, to) + // If only one block is unindexed, do it directly + //if from+1 == to { + // data := ReadCanonicalBodyRLP(db, uint64(from)) + // DeleteTxLookupEntries(db, ReadBlock(db, ReadCanonicalHash(db, from), from)) + // log.Info("Unindexed transactions", "blocks", 1, "tail", to) + // return + //} + // TODO @holiman, add this back (if we want it) + var ( + hashesCh, abortCh = iterateTransactions(db, from, to, false) + batch = db.NewBatch() + start = time.Now() + logged = start.Add(-7 * time.Second) + ) + defer close(abortCh) + // Otherwise spin up the concurrent iterator and unindexer + blocks, txs := 0, 0 + for delivery := range hashesCh { + DeleteTxLookupEntriesByHash(batch, delivery.hashes) + txs += len(delivery.hashes) + blocks++ + + // If enough data was accumulated in memory or we're at the last block, dump to disk + // A batch counts the size of deletion as '1', so we need to flush more + // often than that. + if blocks%1000 == 0 { + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + batch.Reset() + } + // If we've spent too much time already, notify the user of what we're doing + if time.Since(logged) > 8*time.Second { + log.Info("Unindexing transactions", "blocks", "txs", txs, int64(math.Abs(float64(delivery.number-from))), "total", to-from, "elapsed", common.PrettyDuration(time.Since(start))) + logged = time.Now() + } + } + if err := batch.Write(); err != nil { + log.Crit("Failed writing batch to db", "error", err) + return + } + log.Info("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) +} diff --git a/core/rawdb/chain_iterator_test.go b/core/rawdb/chain_iterator_test.go new file mode 100644 index 000000000..c99a97c5f --- /dev/null +++ b/core/rawdb/chain_iterator_test.go @@ -0,0 +1,82 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "math/big" + "reflect" + "sort" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +func TestChainIterator(t *testing.T) { + // Construct test chain db + chainDb := NewMemoryDatabase() + + var block *types.Block + var txs []*types.Transaction + for i := uint64(0); i <= 10; i++ { + if i == 0 { + block = types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, nil, nil, nil) // Empty genesis block + } else { + tx := types.NewTransaction(i, common.BytesToAddress([]byte{0x11}), big.NewInt(111), 1111, big.NewInt(11111), []byte{0x11, 0x11, 0x11}) + txs = append(txs, tx) + block = types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, []*types.Transaction{tx}, nil, nil) + } + WriteBlock(chainDb, block) + WriteCanonicalHash(chainDb, block.Hash(), block.NumberU64()) + } + + var cases = []struct { + from, to uint64 + reverse bool + expect []int + }{ + {0, 11, true, []int{10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}}, + {0, 0, true, nil}, + {0, 5, true, []int{4, 3, 2, 1, 0}}, + {10, 11, true, []int{10}}, + {0, 11, false, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}}, + {0, 0, false, nil}, + {10, 11, false, []int{10}}, + } + for i, c := range cases { + var numbers []int + hashCh, _ := iterateTransactions(chainDb, c.from, c.to, c.reverse) + if hashCh != nil { + for h := range hashCh { + numbers = append(numbers, int(h.number)) + if len(h.hashes) > 0 { + if got, exp := h.hashes[0], txs[h.number-1].Hash(); got != exp { + t.Fatalf("hash wrong, got %x exp %x", got, exp) + } + } + } + } + if !c.reverse { + sort.Ints(numbers) + } else { + sort.Sort(sort.Reverse(sort.IntSlice(numbers))) + } + if !reflect.DeepEqual(numbers, c.expect) { + t.Fatalf("Case %d failed, visit element mismatch, want %v, got %v", i, c.expect, numbers) + } + } +} diff --git a/core/rawdb/freezer_reinit.go b/core/rawdb/freezer_reinit.go deleted file mode 100644 index d6bf9ab1d..000000000 --- a/core/rawdb/freezer_reinit.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rawdb - -import ( - "errors" - "runtime" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/prque" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/log" -) - -// InitDatabaseFromFreezer reinitializes an empty database from a previous batch -// of frozen ancient blocks. The method iterates over all the frozen blocks and -// injects into the database the block hash->number mappings and the transaction -// lookup entries. -func InitDatabaseFromFreezer(db ethdb.Database) error { - // If we can't access the freezer or it's empty, abort - frozen, err := db.Ancients() - if err != nil || frozen == 0 { - return err - } - // Blocks previously frozen, iterate over- and hash them concurrently - var ( - number = ^uint64(0) // -1 - results = make(chan *types.Block, 4*runtime.NumCPU()) - ) - abort := make(chan struct{}) - defer close(abort) - - for i := 0; i < runtime.NumCPU(); i++ { - go func() { - for { - // Fetch the next task number, terminating if everything's done - n := atomic.AddUint64(&number, 1) - if n >= frozen { - return - } - // Retrieve the block from the freezer. If successful, pre-cache - // the block hash and the individual transaction hashes for storing - // into the database. - block := ReadBlock(db, ReadCanonicalHash(db, n), n) - if block != nil { - block.Hash() - for _, tx := range block.Transactions() { - tx.Hash() - } - } - // Feed the block to the aggregator, or abort on interrupt - select { - case results <- block: - case <-abort: - return - } - } - }() - } - // Reassemble the blocks into a contiguous stream and push them out to disk - var ( - queue = prque.New(nil) - next = int64(0) - - batch = db.NewBatch() - start = time.Now() - logged time.Time - ) - for i := uint64(0); i < frozen; i++ { - // Retrieve the next result and bail if it's nil - block := <-results - if block == nil { - return errors.New("broken ancient database") - } - // Push the block into the import queue and process contiguous ranges - queue.Push(block, -int64(block.NumberU64())) - for !queue.Empty() { - // If the next available item is gapped, return - if _, priority := queue.Peek(); -priority != next { - break - } - // Next block available, pop it off and index it - block = queue.PopItem().(*types.Block) - next++ - - // Inject hash<->number mapping and txlookup indexes - WriteHeaderNumber(batch, block.Hash(), block.NumberU64()) - WriteTxLookupEntries(batch, block) - - // If enough data was accumulated in memory or we're at the last block, dump to disk - if batch.ValueSize() > ethdb.IdealBatchSize || uint64(next) == frozen { - if err := batch.Write(); err != nil { - return err - } - batch.Reset() - } - // If we've spent too much time already, notify the user of what we're doing - if time.Since(logged) > 8*time.Second { - log.Info("Initializing chain from ancient data", "number", block.Number(), "hash", block.Hash(), "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start))) - logged = time.Now() - } - } - } - hash := ReadCanonicalHash(db, frozen-1) - WriteHeadHeaderHash(db, hash) - WriteHeadFastBlockHash(db, hash) - - log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start))) - return nil -} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 2c20df200..5a41199a7 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -47,6 +47,12 @@ var ( // snapshotJournalKey tracks the in-memory diff layers across restarts. snapshotJournalKey = []byte("SnapshotJournal") + // txIndexTailKey tracks the oldest block whose transactions have been indexed. + txIndexTailKey = []byte("TransactionIndexTail") + + // fastTxLookupLimitKey tracks the transaction lookup limit during fast sync. + fastTxLookupLimitKey = []byte("FastTransactionLookupLimit") + // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td diff --git a/core/rlp_test.go b/core/rlp_test.go new file mode 100644 index 000000000..04daf2fc6 --- /dev/null +++ b/core/rlp_test.go @@ -0,0 +1,201 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package core + +import ( + "fmt" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + "golang.org/x/crypto/sha3" +) + +func getBlock(transactions int, uncles int, dataSize int) *types.Block { + var ( + aa = common.HexToAddress("0x000000000000000000000000000000000000aaaa") + // Generate a canonical chain to act as the main dataset + engine = ethash.NewFaker() + db = rawdb.NewMemoryDatabase() + // A sender who makes transactions, has some funds + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000) + gspec = &Genesis{ + Config: params.TestChainConfig, + Alloc: GenesisAlloc{address: {Balance: funds}}, + } + genesis = gspec.MustCommit(db) + ) + + // We need to generate as many blocks +1 as uncles + blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, uncles+1, + func(n int, b *BlockGen) { + if n == uncles { + // Add transactions and stuff on the last block + for i := 0; i < transactions; i++ { + tx, _ := types.SignTx(types.NewTransaction(uint64(i), aa, + big.NewInt(0), 50000, big.NewInt(1), make([]byte, dataSize)), types.HomesteadSigner{}, key) + b.AddTx(tx) + } + for i := 0; i < uncles; i++ { + b.AddUncle(&types.Header{ParentHash: b.PrevBlock(n - 1 - i).Hash(), Number: big.NewInt(int64(n - i))}) + } + } + }) + block := blocks[len(blocks)-1] + return block +} + +// TestRlpIterator tests that individual transactions can be picked out +// from blocks without full unmarshalling/marshalling +func TestRlpIterator(t *testing.T) { + for _, tt := range []struct { + txs int + uncles int + datasize int + }{ + {0, 0, 0}, + {0, 2, 0}, + {10, 0, 0}, + {10, 2, 0}, + {10, 2, 50}, + } { + testRlpIterator(t, tt.txs, tt.uncles, tt.datasize) + } +} + +func testRlpIterator(t *testing.T, txs, uncles, datasize int) { + desc := fmt.Sprintf("%d txs [%d datasize] and %d uncles", txs, datasize, uncles) + bodyRlp, _ := rlp.EncodeToBytes(getBlock(txs, uncles, datasize).Body()) + it, err := rlp.NewListIterator(bodyRlp) + if err != nil { + t.Fatal(err) + } + // Check that txs exist + if !it.Next() { + t.Fatal("expected two elems, got zero") + } + txdata := it.Value() + // Check that uncles exist + if !it.Next() { + t.Fatal("expected two elems, got one") + } + // No more after that + if it.Next() { + t.Fatal("expected only two elems, got more") + } + txIt, err := rlp.NewListIterator(txdata) + if err != nil { + t.Fatal(err) + } + var gotHashes []common.Hash + var expHashes []common.Hash + for txIt.Next() { + gotHashes = append(gotHashes, crypto.Keccak256Hash(txIt.Value())) + } + + var expBody types.Body + err = rlp.DecodeBytes(bodyRlp, &expBody) + if err != nil { + t.Fatal(err) + } + for _, tx := range expBody.Transactions { + expHashes = append(expHashes, tx.Hash()) + } + if gotLen, expLen := len(gotHashes), len(expHashes); gotLen != expLen { + t.Fatalf("testcase %v: length wrong, got %d exp %d", desc, gotLen, expLen) + } + // also sanity check against input + if gotLen := len(gotHashes); gotLen != txs { + t.Fatalf("testcase %v: length wrong, got %d exp %d", desc, gotLen, txs) + } + for i, got := range gotHashes { + if exp := expHashes[i]; got != exp { + t.Errorf("testcase %v: hash wrong, got %x, exp %x", desc, got, exp) + } + } +} + +// BenchmarkHashing compares the speeds of hashing a rlp raw data directly +// without the unmarshalling/marshalling step +func BenchmarkHashing(b *testing.B) { + // Make a pretty fat block + var ( + bodyRlp []byte + blockRlp []byte + ) + { + block := getBlock(200, 2, 50) + bodyRlp, _ = rlp.EncodeToBytes(block.Body()) + blockRlp, _ = rlp.EncodeToBytes(block) + } + var got common.Hash + var hasher = sha3.NewLegacyKeccak256() + b.Run("iteratorhashing", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + var hash common.Hash + it, err := rlp.NewListIterator(bodyRlp) + if err != nil { + b.Fatal(err) + } + it.Next() + txs := it.Value() + txIt, err := rlp.NewListIterator(txs) + if err != nil { + b.Fatal(err) + } + for txIt.Next() { + hasher.Reset() + hasher.Write(txIt.Value()) + hasher.Sum(hash[:0]) + got = hash + } + } + }) + var exp common.Hash + b.Run("fullbodyhashing", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + var body types.Body + rlp.DecodeBytes(bodyRlp, &body) + for _, tx := range body.Transactions { + exp = tx.Hash() + } + } + }) + b.Run("fullblockhashing", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + var block types.Block + rlp.DecodeBytes(blockRlp, &block) + for _, tx := range block.Transactions() { + tx.Hash() + } + } + }) + if got != exp { + b.Fatalf("hash wrong, got %x exp %x", got, exp) + } +} diff --git a/eth/backend.go b/eth/backend.go index 3418a587a..391e3c0e6 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -190,7 +190,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { SnapshotLimit: config.SnapshotCache, } ) - eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve) + eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit) if err != nil { return nil, err } diff --git a/eth/config.go b/eth/config.go index 57e14eb6e..45767810d 100644 --- a/eth/config.go +++ b/eth/config.go @@ -105,6 +105,8 @@ type Config struct { NoPruning bool // Whether to disable pruning and flush everything to disk NoPrefetch bool // Whether to disable prefetching and only load state on demand + TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved. + // Whitelist of required block number -> hash values to accept Whitelist map[uint64]common.Hash `toml:"-"` diff --git a/eth/gen_config.go b/eth/gen_config.go index 73c9b8579..56980bc50 100644 --- a/eth/gen_config.go +++ b/eth/gen_config.go @@ -24,6 +24,7 @@ func (c Config) MarshalTOML() (interface{}, error) { DiscoveryURLs []string NoPruning bool NoPrefetch bool + TxLookupLimit uint64 `toml:",omitempty"` Whitelist map[uint64]common.Hash `toml:"-"` LightServ int `toml:",omitempty"` LightIngress int `toml:",omitempty"` @@ -60,6 +61,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.DiscoveryURLs = c.DiscoveryURLs enc.NoPruning = c.NoPruning enc.NoPrefetch = c.NoPrefetch + enc.TxLookupLimit = c.TxLookupLimit enc.Whitelist = c.Whitelist enc.LightServ = c.LightServ enc.LightIngress = c.LightIngress @@ -98,6 +100,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { DiscoveryURLs []string NoPruning *bool NoPrefetch *bool + TxLookupLimit *uint64 `toml:",omitempty"` Whitelist map[uint64]common.Hash `toml:"-"` LightServ *int `toml:",omitempty"` LightIngress *int `toml:",omitempty"` @@ -149,6 +152,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.NoPrefetch != nil { c.NoPrefetch = *dec.NoPrefetch } + if dec.TxLookupLimit != nil { + c.TxLookupLimit = *dec.TxLookupLimit + } if dec.Whitelist != nil { c.Whitelist = dec.Whitelist } diff --git a/eth/handler.go b/eth/handler.go index 9a02f1f20..5a0ffcef6 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -72,6 +72,7 @@ type ProtocolManager struct { txpool txPool blockchain *core.BlockChain + chaindb ethdb.Database maxPeers int downloader *downloader.Downloader @@ -108,6 +109,7 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh eventMux: mux, txpool: txpool, blockchain: blockchain, + chaindb: chaindb, peers: newPeerSet(), whitelist: whitelist, txsyncCh: make(chan *txsync), diff --git a/eth/handler_test.go b/eth/handler_test.go index 1b398a8c0..142630857 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -491,7 +491,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo } } // Create a checkpoint aware protocol manager - blockchain, err := core.NewBlockChain(db, nil, config, ethash.NewFaker(), vm.Config{}, nil) + blockchain, err := core.NewBlockChain(db, nil, config, ethash.NewFaker(), vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create new blockchain: %v", err) } @@ -578,7 +578,7 @@ func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) { gspec = &core.Genesis{Config: config} genesis = gspec.MustCommit(db) ) - blockchain, err := core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil) + blockchain, err := core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create new blockchain: %v", err) } @@ -639,7 +639,7 @@ func TestBroadcastMalformedBlock(t *testing.T) { gspec = &core.Genesis{Config: config} genesis = gspec.MustCommit(db) ) - blockchain, err := core.NewBlockChain(db, nil, config, engine, vm.Config{}, nil) + blockchain, err := core.NewBlockChain(db, nil, config, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("failed to create new blockchain: %v", err) } diff --git a/eth/helper_test.go b/eth/helper_test.go index 3338af71d..65effcc16 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -62,7 +62,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}}, } genesis = gspec.MustCommit(db) - blockchain, _ = core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}, nil) + blockchain, _ = core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}, nil, nil) ) chain, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, blocks, generator) if _, err := blockchain.InsertChain(chain); err != nil { diff --git a/eth/protocol_test.go b/eth/protocol_test.go index a313e4e6c..fc916a226 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -175,8 +175,8 @@ func TestForkIDSplit(t *testing.T) { genesisNoFork = gspecNoFork.MustCommit(dbNoFork) genesisProFork = gspecProFork.MustCommit(dbProFork) - chainNoFork, _ = core.NewBlockChain(dbNoFork, nil, configNoFork, engine, vm.Config{}, nil) - chainProFork, _ = core.NewBlockChain(dbProFork, nil, configProFork, engine, vm.Config{}, nil) + chainNoFork, _ = core.NewBlockChain(dbNoFork, nil, configNoFork, engine, vm.Config{}, nil, nil) + chainProFork, _ = core.NewBlockChain(dbProFork, nil, configProFork, engine, vm.Config{}, nil, nil) blocksNoFork, _ = core.GenerateChain(configNoFork, genesisNoFork, engine, dbNoFork, 2, nil) blocksProFork, _ = core.GenerateChain(configProFork, genesisProFork, engine, dbProFork, 2, nil) diff --git a/eth/sync.go b/eth/sync.go index 6802fab0d..d04433f42 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -23,6 +23,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/log" @@ -285,6 +286,24 @@ func (cs *chainSyncer) startSync(op *chainSyncOp) { // doSync synchronizes the local blockchain with a remote peer. func (pm *ProtocolManager) doSync(op *chainSyncOp) error { + if op.mode == downloader.FastSync { + // Before launch the fast sync, we have to ensure user uses the same + // txlookup limit. + // The main concern here is: during the fast sync Geth won't index the + // block(generate tx indices) before the HEAD-limit. But if user changes + // the limit in the next fast sync(e.g. user kill Geth manually and + // restart) then it will be hard for Geth to figure out the oldest block + // has been indexed. So here for the user-experience wise, it's non-optimal + // that user can't change limit during the fast sync. If changed, Geth + // will just blindly use the original one. + limit := pm.blockchain.TxLookupLimit() + if stored := rawdb.ReadFastTxLookupLimit(pm.chaindb); stored == nil { + rawdb.WriteFastTxLookupLimit(pm.chaindb, limit) + } else if *stored != limit { + pm.blockchain.SetTxLookupLimit(*stored) + log.Warn("Update txLookup limit", "provided", limit, "updated", *stored) + } + } // Run the sync cycle, and disable fast sync if we're past the pivot block err := pm.downloader.Synchronise(op.peer.id, op.head, op.td, op.mode) if err != nil { diff --git a/light/odr_test.go b/light/odr_test.go index ac702b6bb..78bf373e6 100644 --- a/light/odr_test.go +++ b/light/odr_test.go @@ -257,7 +257,7 @@ func testChainOdr(t *testing.T, protocol int, fn odrTestFn) { ) gspec.MustCommit(ldb) // Assemble the test environment - blockchain, _ := core.NewBlockChain(sdb, nil, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}, nil) + blockchain, _ := core.NewBlockChain(sdb, nil, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}, nil, nil) gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), sdb, 4, testChainGen) if _, err := blockchain.InsertChain(gchain); err != nil { t.Fatal(err) diff --git a/light/trie_test.go b/light/trie_test.go index 4919f8964..052194b4d 100644 --- a/light/trie_test.go +++ b/light/trie_test.go @@ -40,7 +40,7 @@ func TestNodeIterator(t *testing.T) { genesis = gspec.MustCommit(fulldb) ) gspec.MustCommit(lightdb) - blockchain, _ := core.NewBlockChain(fulldb, nil, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}, nil) + blockchain, _ := core.NewBlockChain(fulldb, nil, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}, nil, nil) gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), fulldb, 4, testChainGen) if _, err := blockchain.InsertChain(gchain); err != nil { panic(err) diff --git a/light/txpool_test.go b/light/txpool_test.go index 0996bd7c9..39d5afe52 100644 --- a/light/txpool_test.go +++ b/light/txpool_test.go @@ -88,7 +88,7 @@ func TestTxPool(t *testing.T) { ) gspec.MustCommit(ldb) // Assemble the test environment - blockchain, _ := core.NewBlockChain(sdb, nil, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}, nil) + blockchain, _ := core.NewBlockChain(sdb, nil, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}, nil, nil) gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), sdb, poolTestBlocks, txPoolTestChainGen) if _, err := blockchain.InsertChain(gchain); err != nil { panic(err) diff --git a/miner/worker_test.go b/miner/worker_test.go index 86bb7db64..65eccbc4c 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -117,7 +117,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine } genesis := gspec.MustCommit(db) - chain, _ := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true}, gspec.Config, engine, vm.Config{}, nil) + chain, _ := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true}, gspec.Config, engine, vm.Config{}, nil, nil) txpool := core.NewTxPool(testTxPoolConfig, chainConfig, chain) // Generate a small n-block chain and an uncle block for it @@ -212,7 +212,7 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) { // This test chain imports the mined blocks. db2 := rawdb.NewMemoryDatabase() b.genesis.MustCommit(db2) - chain, _ := core.NewBlockChain(db2, nil, b.chain.Config(), engine, vm.Config{}, nil) + chain, _ := core.NewBlockChain(db2, nil, b.chain.Config(), engine, vm.Config{}, nil, nil) defer chain.Stop() // Ignore empty commit here for less noise. diff --git a/rlp/iterator.go b/rlp/iterator.go new file mode 100644 index 000000000..c28866dbc --- /dev/null +++ b/rlp/iterator.go @@ -0,0 +1,60 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rlp + +type listIterator struct { + data []byte + next []byte + err error +} + +// NewListIterator creates an iterator for the (list) represented by data +func NewListIterator(data RawValue) (*listIterator, error) { + k, t, c, err := readKind(data) + if err != nil { + return nil, err + } + if k != List { + return nil, ErrExpectedList + } + it := &listIterator{ + data: data[t : t+c], + } + return it, nil + +} + +// Next forwards the iterator one step, returns true if it was not at end yet +func (it *listIterator) Next() bool { + if len(it.data) == 0 { + return false + } + _, t, c, err := readKind(it.data) + it.next = it.data[:t+c] + it.data = it.data[t+c:] + it.err = err + return true +} + +// Value returns the current value +func (it *listIterator) Value() []byte { + return it.next +} + +func (it *listIterator) Err() error { + return it.err +} diff --git a/rlp/iterator_test.go b/rlp/iterator_test.go new file mode 100644 index 000000000..53c381918 --- /dev/null +++ b/rlp/iterator_test.go @@ -0,0 +1,59 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rlp + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common/hexutil" +) + +// TestIterator tests some basic things about the ListIterator. A more +// comprehensive test can be found in core/rlp_test.go, where we can +// use both types and rlp without dependency cycles +func TestIterator(t *testing.T) { + bodyRlpHex := "0xf902cbf8d6f869800182c35094000000000000000000000000000000000000aaaa808a000000000000000000001ba01025c66fad28b4ce3370222624d952c35529e602af7cbe04f667371f61b0e3b3a00ab8813514d1217059748fd903288ace1b4001a4bc5fbde2790debdc8167de2ff869010182c35094000000000000000000000000000000000000aaaa808a000000000000000000001ca05ac4cf1d19be06f3742c21df6c49a7e929ceb3dbaf6a09f3cfb56ff6828bd9a7a06875970133a35e63ac06d360aa166d228cc013e9b96e0a2cae7f55b22e1ee2e8f901f0f901eda0c75448377c0e426b8017b23c5f77379ecf69abc1d5c224284ad3ba1c46c59adaa00000000000000000000000000000000000000000000000000000000000000000940000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000b9010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000808080808080a00000000000000000000000000000000000000000000000000000000000000000880000000000000000" + bodyRlp := hexutil.MustDecode(bodyRlpHex) + + it, err := NewListIterator(bodyRlp) + if err != nil { + t.Fatal(err) + } + // Check that txs exist + if !it.Next() { + t.Fatal("expected two elems, got zero") + } + txs := it.Value() + // Check that uncles exist + if !it.Next() { + t.Fatal("expected two elems, got one") + } + txit, err := NewListIterator(txs) + if err != nil { + t.Fatal(err) + } + var i = 0 + for txit.Next() { + if txit.err != nil { + t.Fatal(txit.err) + } + i++ + } + if exp := 2; i != exp { + t.Errorf("count wrong, expected %d got %d", i, exp) + } +} diff --git a/tests/block_test_util.go b/tests/block_test_util.go index dd622c2bc..be9cdb70c 100644 --- a/tests/block_test_util.go +++ b/tests/block_test_util.go @@ -124,7 +124,7 @@ func (t *BlockTest) Run(snapshotter bool) error { cache.SnapshotLimit = 1 cache.SnapshotWait = true } - chain, err := core.NewBlockChain(db, cache, config, engine, vm.Config{}, nil) + chain, err := core.NewBlockChain(db, cache, config, engine, vm.Config{}, nil, nil) if err != nil { return err }