From 80469bea0cc6dbfae749d944094a7c2357dc050d Mon Sep 17 00:00:00 2001 From: gary rong Date: Thu, 25 Apr 2019 22:59:48 +0800 Subject: [PATCH] all: integrate the freezer with fast sync * all: freezer style syncing core, eth, les, light: clean up freezer relative APIs core, eth, les, trie, ethdb, light: clean a bit core, eth, les, light: add unit tests core, light: rewrite setHead function core, eth: fix downloader unit tests core: add receipt chain insertion test core: use constant instead of hardcoding table name core: fix rollback core: fix setHead core/rawdb: remove canonical block first and then iterate side chain core/rawdb, ethdb: add hasAncient interface eth/downloader: calculate ancient limit via cht first core, eth, ethdb: lots of fixes * eth/downloader: print ancient disable log only for fast sync --- core/blockchain.go | 395 +++++++++++++++++++++++------- core/blockchain_test.go | 211 ++++++++++++++-- core/headerchain.go | 64 +++-- core/rawdb/accessors_chain.go | 178 ++++++++++---- core/rawdb/accessors_indexes.go | 12 +- core/rawdb/accessors_metadata.go | 12 +- core/rawdb/database.go | 31 ++- core/rawdb/freezer.go | 212 +++++++++++----- core/rawdb/freezer_table.go | 10 +- core/rawdb/schema.go | 17 ++ core/rawdb/table.go | 32 ++- core/state/database.go | 2 +- core/state/sync.go | 2 +- eth/downloader/downloader.go | 54 +++- eth/downloader/downloader_test.go | 72 +++++- ethdb/batch.go | 4 +- ethdb/database.go | 56 +++-- ethdb/leveldb/leveldb.go | 4 +- ethdb/memorydb/memorydb.go | 2 +- les/odr_requests.go | 2 +- light/lightchain.go | 6 +- light/nodeset.go | 6 +- light/trie.go | 2 +- trie/database.go | 2 +- trie/proof.go | 8 +- trie/sync.go | 6 +- 26 files changed, 1076 insertions(+), 326 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 61b809319..4ac2c3a44 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -63,6 +63,8 @@ var ( blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil) blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) + + errInsertionInterrupted = errors.New("insertion is interrupted") ) const ( @@ -138,7 +140,6 @@ type BlockChain struct { chainmu sync.RWMutex // blockchain insertion lock - checkpoint int // checkpoint counts towards the new checkpoint currentBlock atomic.Value // Current head of the block chain currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) @@ -161,8 +162,9 @@ type BlockChain struct { processor Processor // Block transaction processor interface vmConfig vm.Config - badBlocks *lru.Cache // Bad block cache - shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. + badBlocks *lru.Cache // Bad block cache + shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. + terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. } // NewBlockChain returns a fully initialised block chain using information @@ -216,6 +218,39 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par if err := bc.loadLastState(); err != nil { return nil, err } + if frozen, err := bc.db.Ancients(); err == nil && frozen >= 1 { + var ( + needRewind bool + low uint64 + ) + // The head full block may be rolled back to a very low height due to + // blockchain repair. If the head full block is even lower than the ancient + // chain, truncate the ancient store. + fullBlock := bc.CurrentBlock() + if fullBlock != nil && fullBlock != bc.genesisBlock && fullBlock.NumberU64() < frozen-1 { + needRewind = true + low = fullBlock.NumberU64() + } + // In fast sync, it may happen that ancient data has been written to the + // ancient store, but the LastFastBlock has not been updated, truncate the + // extra data here. + fastBlock := bc.CurrentFastBlock() + if fastBlock != nil && fastBlock.NumberU64() < frozen-1 { + needRewind = true + if fastBlock.NumberU64() < low || low == 0 { + low = fastBlock.NumberU64() + } + } + if needRewind { + var hashes []common.Hash + previous := bc.CurrentHeader().Number.Uint64() + for i := low + 1; i <= bc.CurrentHeader().Number.Uint64(); i++ { + hashes = append(hashes, rawdb.ReadCanonicalHash(bc.db, i)) + } + bc.Rollback(hashes) + log.Warn("Truncate ancient chain", "from", previous, "to", low) + } + } // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain for hash := range BadHashes { if header := bc.GetHeaderByHash(hash); header != nil { @@ -267,6 +302,7 @@ func (bc *BlockChain) loadLastState() error { if err := bc.repair(¤tBlock); err != nil { return err } + rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash()) } // Everything seems to be fine, set as the head block bc.currentBlock.Store(currentBlock) @@ -312,12 +348,55 @@ func (bc *BlockChain) SetHead(head uint64) error { bc.chainmu.Lock() defer bc.chainmu.Unlock() - // Rewind the header chain, deleting all block bodies until then - delFn := func(db ethdb.Writer, hash common.Hash, num uint64) { - rawdb.DeleteBody(db, hash, num) + updateFn := func(db ethdb.KeyValueWriter, header *types.Header) { + // Rewind the block chain, ensuring we don't end up with a stateless head block + if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() { + newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) + if newHeadBlock == nil { + newHeadBlock = bc.genesisBlock + } else { + if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil { + // Rewound state missing, rolled back to before pivot, reset to genesis + newHeadBlock = bc.genesisBlock + } + } + rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash()) + bc.currentBlock.Store(newHeadBlock) + } + + // Rewind the fast block in a simpleton way to the target head + if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() { + newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) + // If either blocks reached nil, reset to the genesis state + if newHeadFastBlock == nil { + newHeadFastBlock = bc.genesisBlock + } + rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash()) + bc.currentFastBlock.Store(newHeadFastBlock) + } } - bc.hc.SetHead(head, delFn) - currentHeader := bc.hc.CurrentHeader() + + // Rewind the header chain, deleting all block bodies until then + delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) { + // Ignore the error here since light client won't hit this path + frozen, _ := bc.db.Ancients() + if num+1 <= frozen { + // Truncate all relative data(header, total difficulty, body, receipt + // and canonical hash) from ancient store. + bc.db.TruncateAncients(num + 1) + + // Remove the hash <-> number mapping from the active store. + rawdb.DeleteHeaderNumber(db, hash) + } else { + // Remove relative body and receipts from the active store. + // The header, total difficulty and canonical hash will be + // removed in the hc.SetHead function. + rawdb.DeleteBody(db, hash, num) + rawdb.DeleteReceipts(db, hash, num) + } + // Todo(rjl493456442) txlookup, bloombits, etc + } + bc.hc.SetHead(head, updateFn, delFn) // Clear out any stale content from the caches bc.bodyCache.Purge() @@ -326,33 +405,6 @@ func (bc *BlockChain) SetHead(head uint64) error { bc.blockCache.Purge() bc.futureBlocks.Purge() - // Rewind the block chain, ensuring we don't end up with a stateless head block - if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentHeader.Number.Uint64() < currentBlock.NumberU64() { - bc.currentBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())) - } - if currentBlock := bc.CurrentBlock(); currentBlock != nil { - if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil { - // Rewound state missing, rolled back to before pivot, reset to genesis - bc.currentBlock.Store(bc.genesisBlock) - } - } - // Rewind the fast block in a simpleton way to the target head - if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number.Uint64() < currentFastBlock.NumberU64() { - bc.currentFastBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())) - } - // If either blocks reached nil, reset to the genesis state - if currentBlock := bc.CurrentBlock(); currentBlock == nil { - bc.currentBlock.Store(bc.genesisBlock) - } - if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil { - bc.currentFastBlock.Store(bc.genesisBlock) - } - currentBlock := bc.CurrentBlock() - currentFastBlock := bc.CurrentFastBlock() - - rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash()) - rawdb.WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()) - return bc.loadLastState() } @@ -780,96 +832,259 @@ func (bc *BlockChain) Rollback(chain []common.Hash) { } if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash { newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1) - bc.currentFastBlock.Store(newFastBlock) rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash()) + bc.currentFastBlock.Store(newFastBlock) } if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash { newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1) - bc.currentBlock.Store(newBlock) rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash()) + bc.currentBlock.Store(newBlock) } } + // Truncate ancient data which exceeds the current header. + // + // Notably, it can happen that system crashes without truncating the ancient data + // but the head indicator has been updated in the active store. Regarding this issue, + // system will self recovery by truncating the extra data during the setup phase. + if err := bc.truncateAncient(bc.hc.CurrentHeader().Number.Uint64()); err != nil { + log.Crit("Truncate ancient store failed", "err", err) + } +} + +// truncateAncient rewinds the blockchain to the specified header and deletes all +// data in the ancient store that exceeds the specified header. +func (bc *BlockChain) truncateAncient(head uint64) error { + frozen, err := bc.db.Ancients() + if err != nil { + return err + } + // Short circuit if there is no data to truncate in ancient store. + if frozen <= head+1 { + return nil + } + // Truncate all the data in the freezer beyond the specified head + if err := bc.db.TruncateAncients(head + 1); err != nil { + return err + } + // Clear out any stale content from the caches + bc.hc.headerCache.Purge() + bc.hc.tdCache.Purge() + bc.hc.numberCache.Purge() + + // Clear out any stale content from the caches + bc.bodyCache.Purge() + bc.bodyRLPCache.Purge() + bc.receiptsCache.Purge() + bc.blockCache.Purge() + bc.futureBlocks.Purge() + + log.Info("Rewind ancient data", "number", head) + return nil } // InsertReceiptChain attempts to complete an already existing header chain with // transaction and receipt data. -func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { +func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) { bc.wg.Add(1) defer bc.wg.Done() + var ( + ancientBlocks, liveBlocks types.Blocks + ancientReceipts, liveReceipts []types.Receipts + ) // Do a sanity check that the provided chain is actually ordered and linked - for i := 1; i < len(blockChain); i++ { - if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() { - log.Error("Non contiguous receipt insert", "number", blockChain[i].Number(), "hash", blockChain[i].Hash(), "parent", blockChain[i].ParentHash(), - "prevnumber", blockChain[i-1].Number(), "prevhash", blockChain[i-1].Hash()) - return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, blockChain[i-1].NumberU64(), - blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4]) + for i := 0; i < len(blockChain); i++ { + if i != 0 { + if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() { + log.Error("Non contiguous receipt insert", "number", blockChain[i].Number(), "hash", blockChain[i].Hash(), "parent", blockChain[i].ParentHash(), + "prevnumber", blockChain[i-1].Number(), "prevhash", blockChain[i-1].Hash()) + return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, blockChain[i-1].NumberU64(), + blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4]) + } + } + if blockChain[i].NumberU64() <= ancientLimit { + ancientBlocks, ancientReceipts = append(ancientBlocks, blockChain[i]), append(ancientReceipts, receiptChain[i]) + } else { + liveBlocks, liveReceipts = append(liveBlocks, blockChain[i]), append(liveReceipts, receiptChain[i]) } } var ( stats = struct{ processed, ignored int32 }{} start = time.Now() - bytes = 0 - batch = bc.db.NewBatch() + size = 0 ) - for i, block := range blockChain { - receipts := receiptChain[i] - // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { - return 0, nil - } - // Short circuit if the owner header is unknown - if !bc.HasHeader(block.Hash(), block.NumberU64()) { - return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) - } - // Skip if the entire data is already known - if bc.HasBlock(block.Hash(), block.NumberU64()) { - stats.ignored++ - continue - } - // Compute all the non-consensus fields of the receipts - if err := receipts.DeriveFields(bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { - return i, fmt.Errorf("failed to derive receipts data: %v", err) - } - // Write all the data out into the database - rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) - rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts) - rawdb.WriteTxLookupEntries(batch, block) - - stats.processed++ - - if batch.ValueSize() >= ethdb.IdealBatchSize { - if err := batch.Write(); err != nil { - return 0, err + // updateHead updates the head fast sync block if the inserted blocks are better + // and returns a indicator whether the inserted blocks are canonical. + updateHead := func(head *types.Block) bool { + var isCanonical bool + bc.chainmu.Lock() + if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case + currentFastBlock := bc.CurrentFastBlock() + if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 { + rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) + bc.currentFastBlock.Store(head) + isCanonical = true } - bytes += batch.ValueSize() - batch.Reset() } + bc.chainmu.Unlock() + return isCanonical } - if batch.ValueSize() > 0 { - bytes += batch.ValueSize() + // writeAncient writes blockchain and corresponding receipt chain into ancient store. + // + // this function only accepts canonical chain data. All side chain will be reverted + // eventually. + writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { + var ( + previous = bc.CurrentFastBlock() + batch = bc.db.NewBatch() + ) + // If any error occurs before updating the head or we are inserting a side chain, + // all the data written this time wll be rolled back. + defer func() { + if previous != nil { + if err := bc.truncateAncient(previous.NumberU64()); err != nil { + log.Crit("Truncate ancient store failed", "err", err) + } + } + }() + for i, block := range blockChain { + // Short circuit insertion if shutting down or processing failed + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + return 0, errInsertionInterrupted + } + // Short circuit insertion if it is required(used in testing only) + if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) { + return i, errors.New("insertion is terminated for testing purpose") + } + // Short circuit if the owner header is unknown + if !bc.HasHeader(block.Hash(), block.NumberU64()) { + return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) + } + // Compute all the non-consensus fields of the receipts + if err := receiptChain[i].DeriveFields(bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { + return i, fmt.Errorf("failed to derive receipts data: %v", err) + } + // Initialize freezer with genesis block first + if frozen, err := bc.db.Ancients(); err == nil && frozen == 0 && block.NumberU64() == 1 { + genesisBlock := rawdb.ReadBlock(bc.db, rawdb.ReadCanonicalHash(bc.db, 0), 0) + size += rawdb.WriteAncientBlock(bc.db, genesisBlock, nil, genesisBlock.Difficulty()) + } + // Flush data into ancient store. + size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64())) + rawdb.WriteTxLookupEntries(batch, block) + + stats.processed++ + } + // Flush all tx-lookup index data. + size += batch.ValueSize() if err := batch.Write(); err != nil { return 0, err } - } + batch.Reset() - // Update the head fast sync block if better - bc.chainmu.Lock() - head := blockChain[len(blockChain)-1] - if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case - currentFastBlock := bc.CurrentFastBlock() - if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 { - rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) - bc.currentFastBlock.Store(head) + // Sync the ancient store explicitly to ensure all data has been flushed to disk. + if err := bc.db.Sync(); err != nil { + return 0, err + } + if !updateHead(blockChain[len(blockChain)-1]) { + return 0, errors.New("side blocks can't be accepted as the ancient chain data") + } + previous = nil // disable rollback explicitly + + // Remove the ancient data from the active store + cleanGenesis := len(blockChain) > 0 && blockChain[0].NumberU64() == 1 + if cleanGenesis { + // Migrate genesis block to ancient store too. + rawdb.DeleteBlockWithoutNumber(batch, rawdb.ReadCanonicalHash(bc.db, 0), 0) + rawdb.DeleteCanonicalHash(batch, 0) + } + // Wipe out canonical block data. + for _, block := range blockChain { + rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64()) + rawdb.DeleteCanonicalHash(batch, block.NumberU64()) + } + if err := batch.Write(); err != nil { + return 0, err + } + batch.Reset() + // Wipe out side chain too. + for _, block := range blockChain { + for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) { + rawdb.DeleteBlock(batch, hash, block.NumberU64()) + } + } + if err := batch.Write(); err != nil { + return 0, err + } + return 0, nil + } + // writeLive writes blockchain and corresponding receipt chain into active store. + writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { + batch := bc.db.NewBatch() + for i, block := range blockChain { + // Short circuit insertion if shutting down or processing failed + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + return 0, errInsertionInterrupted + } + // Short circuit if the owner header is unknown + if !bc.HasHeader(block.Hash(), block.NumberU64()) { + return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) + } + if bc.HasBlock(block.Hash(), block.NumberU64()) { + stats.ignored++ + continue + } + // Compute all the non-consensus fields of the receipts + if err := receiptChain[i].DeriveFields(bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { + return i, fmt.Errorf("failed to derive receipts data: %v", err) + } + // Write all the data out into the database + rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) + rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) + rawdb.WriteTxLookupEntries(batch, block) + + stats.processed++ + if batch.ValueSize() >= ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + return 0, err + } + size += batch.ValueSize() + batch.Reset() + } + } + if batch.ValueSize() > 0 { + size += batch.ValueSize() + if err := batch.Write(); err != nil { + return 0, err + } + } + updateHead(blockChain[len(blockChain)-1]) + return 0, nil + } + // Write downloaded chain data and corresponding receipt chain data. + if len(ancientBlocks) > 0 { + if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil { + if err == errInsertionInterrupted { + return 0, nil + } + return n, err + } + } + if len(liveBlocks) > 0 { + if n, err := writeLive(liveBlocks, liveReceipts); err != nil { + if err == errInsertionInterrupted { + return 0, nil + } + return n, err } } - bc.chainmu.Unlock() + head := blockChain[len(blockChain)-1] context := []interface{}{ "count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)), "number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)), - "size", common.StorageSize(bytes), + "size", common.StorageSize(size), } if stats.ignored > 0 { context = append(context, []interface{}{"ignored", stats.ignored}...) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 70e3207f5..7b1a9a54f 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -18,8 +18,10 @@ package core import ( "fmt" + "io/ioutil" "math/big" "math/rand" + "os" "sync" "testing" "time" @@ -33,7 +35,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/params" ) @@ -639,7 +640,27 @@ func TestFastVsFullChains(t *testing.T) { if n, err := fast.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } - if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil { + if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + // Freezer style fast import the chain. + frdir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.Remove(frdir) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(ancientDb) + ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + defer ancient.Stop() + + if n, err := ancient.InsertHeaderChain(headers, 1); err != nil { + t.Fatalf("failed to insert header %d: %v", n, err) + } + if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil { t.Fatalf("failed to insert receipt %d: %v", n, err) } // Iterate over all chain data components, and cross reference @@ -647,26 +668,35 @@ func TestFastVsFullChains(t *testing.T) { num, hash := blocks[i].NumberU64(), blocks[i].Hash() if ftd, atd := fast.GetTdByHash(hash), archive.GetTdByHash(hash); ftd.Cmp(atd) != 0 { - t.Errorf("block #%d [%x]: td mismatch: have %v, want %v", num, hash, ftd, atd) + t.Errorf("block #%d [%x]: td mismatch: fastdb %v, archivedb %v", num, hash, ftd, atd) + } + if antd, artd := ancient.GetTdByHash(hash), archive.GetTdByHash(hash); antd.Cmp(artd) != 0 { + t.Errorf("block #%d [%x]: td mismatch: ancientdb %v, archivedb %v", num, hash, antd, artd) } if fheader, aheader := fast.GetHeaderByHash(hash), archive.GetHeaderByHash(hash); fheader.Hash() != aheader.Hash() { - t.Errorf("block #%d [%x]: header mismatch: have %v, want %v", num, hash, fheader, aheader) + t.Errorf("block #%d [%x]: header mismatch: fastdb %v, archivedb %v", num, hash, fheader, aheader) } - if fblock, ablock := fast.GetBlockByHash(hash), archive.GetBlockByHash(hash); fblock.Hash() != ablock.Hash() { - t.Errorf("block #%d [%x]: block mismatch: have %v, want %v", num, hash, fblock, ablock) - } else if types.DeriveSha(fblock.Transactions()) != types.DeriveSha(ablock.Transactions()) { - t.Errorf("block #%d [%x]: transactions mismatch: have %v, want %v", num, hash, fblock.Transactions(), ablock.Transactions()) - } else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(ablock.Uncles()) { - t.Errorf("block #%d [%x]: uncles mismatch: have %v, want %v", num, hash, fblock.Uncles(), ablock.Uncles()) + if anheader, arheader := ancient.GetHeaderByHash(hash), archive.GetHeaderByHash(hash); anheader.Hash() != arheader.Hash() { + t.Errorf("block #%d [%x]: header mismatch: ancientdb %v, archivedb %v", num, hash, anheader, arheader) } - if freceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), archive.Config()); types.DeriveSha(freceipts) != types.DeriveSha(areceipts) { - t.Errorf("block #%d [%x]: receipts mismatch: have %v, want %v", num, hash, freceipts, areceipts) + if fblock, arblock, anblock := fast.GetBlockByHash(hash), archive.GetBlockByHash(hash), ancient.GetBlockByHash(hash); fblock.Hash() != arblock.Hash() || anblock.Hash() != arblock.Hash() { + t.Errorf("block #%d [%x]: block mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock, anblock, arblock) + } else if types.DeriveSha(fblock.Transactions()) != types.DeriveSha(arblock.Transactions()) || types.DeriveSha(anblock.Transactions()) != types.DeriveSha(arblock.Transactions()) { + t.Errorf("block #%d [%x]: transactions mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Transactions(), anblock.Transactions(), arblock.Transactions()) + } else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) || types.CalcUncleHash(anblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) { + t.Errorf("block #%d [%x]: uncles mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Uncles(), anblock, arblock.Uncles()) + } + if freceipts, anreceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(ancientDb, hash, *rawdb.ReadHeaderNumber(ancientDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), fast.Config()); types.DeriveSha(freceipts) != types.DeriveSha(areceipts) { + t.Errorf("block #%d [%x]: receipts mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, freceipts, anreceipts, areceipts) } } // Check that the canonical chains are the same between the databases for i := 0; i < len(blocks)+1; i++ { if fhash, ahash := rawdb.ReadCanonicalHash(fastDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); fhash != ahash { - t.Errorf("block #%d: canonical hash mismatch: have %v, want %v", i, fhash, ahash) + t.Errorf("block #%d: canonical hash mismatch: fastdb %v, archivedb %v", i, fhash, ahash) + } + if anhash, arhash := rawdb.ReadCanonicalHash(ancientDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); anhash != arhash { + t.Errorf("block #%d: canonical hash mismatch: ancientdb %v, archivedb %v", i, anhash, arhash) } } } @@ -730,13 +760,40 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { if n, err := fast.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } - if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil { + if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil { t.Fatalf("failed to insert receipt %d: %v", n, err) } assert(t, "fast", fast, height, height, 0) fast.Rollback(remove) assert(t, "fast", fast, height/2, height/2, 0) + // Import the chain as a ancient-first node and ensure all pointers are updated + frdir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.Remove(frdir) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(ancientDb) + ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + defer ancient.Stop() + + if n, err := ancient.InsertHeaderChain(headers, 1); err != nil { + t.Fatalf("failed to insert header %d: %v", n, err) + } + if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + assert(t, "ancient", ancient, height, height, 0) + ancient.Rollback(remove) + assert(t, "ancient", ancient, height/2, height/2, 0) + if frozen, err := ancientDb.Ancients(); err != nil || frozen != height/2+1 { + t.Fatalf("failed to truncate ancient store, want %v, have %v", height/2+1, frozen) + } + // Import the chain as a light node and ensure all pointers are updated lightDb := rawdb.NewMemoryDatabase() gspec.MustCommit(lightDb) @@ -918,7 +975,7 @@ func TestLogRebirth(t *testing.T) { var ( key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) - db = memorydb.New() + db = rawdb.NewMemoryDatabase() // this code generates a log code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") @@ -1040,7 +1097,7 @@ func TestSideLogRebirth(t *testing.T) { var ( key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) - db = memorydb.New() + db = rawdb.NewMemoryDatabase() // this code generates a log code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") @@ -1564,6 +1621,122 @@ func TestLargeReorgTrieGC(t *testing.T) { } } +func TestBlockchainRecovery(t *testing.T) { + // Configure and generate a sample block chain + var ( + gendb = rawdb.NewMemoryDatabase() + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000) + gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}} + genesis = gspec.MustCommit(gendb) + ) + height := uint64(1024) + blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil) + + // Import the chain as a ancient-first node and ensure all pointers are updated + frdir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.Remove(frdir) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(ancientDb) + ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + + headers := make([]*types.Header, len(blocks)) + for i, block := range blocks { + headers[i] = block.Header() + } + if n, err := ancient.InsertHeaderChain(headers, 1); err != nil { + t.Fatalf("failed to insert header %d: %v", n, err) + } + if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + ancient.Stop() + + // Destroy head fast block manually + midBlock := blocks[len(blocks)/2] + rawdb.WriteHeadFastBlockHash(ancientDb, midBlock.Hash()) + + // Reopen broken blockchain again + ancient, _ = NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + defer ancient.Stop() + if num := ancient.CurrentBlock().NumberU64(); num != 0 { + t.Errorf("head block mismatch: have #%v, want #%v", num, 0) + } + if num := ancient.CurrentFastBlock().NumberU64(); num != midBlock.NumberU64() { + t.Errorf("head fast-block mismatch: have #%v, want #%v", num, midBlock.NumberU64()) + } + if num := ancient.CurrentHeader().Number.Uint64(); num != midBlock.NumberU64() { + t.Errorf("head header mismatch: have #%v, want #%v", num, midBlock.NumberU64()) + } +} + +func TestIncompleteAncientReceiptChainInsertion(t *testing.T) { + // Configure and generate a sample block chain + var ( + gendb = rawdb.NewMemoryDatabase() + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000) + gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}} + genesis = gspec.MustCommit(gendb) + ) + height := uint64(1024) + blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil) + + // Import the chain as a ancient-first node and ensure all pointers are updated + frdir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.Remove(frdir) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") + if err != nil { + t.Fatalf("failed to create temp freezer db: %v", err) + } + gspec.MustCommit(ancientDb) + ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + defer ancient.Stop() + + headers := make([]*types.Header, len(blocks)) + for i, block := range blocks { + headers[i] = block.Header() + } + if n, err := ancient.InsertHeaderChain(headers, 1); err != nil { + t.Fatalf("failed to insert header %d: %v", n, err) + } + // Abort ancient receipt chain insertion deliberately + ancient.terminateInsert = func(hash common.Hash, number uint64) bool { + if number == blocks[len(blocks)/2].NumberU64() { + return true + } + return false + } + previousFastBlock := ancient.CurrentFastBlock() + if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + if ancient.CurrentFastBlock().NumberU64() != previousFastBlock.NumberU64() { + t.Fatalf("failed to rollback ancient data, want %d, have %d", previousFastBlock.NumberU64(), ancient.CurrentFastBlock().NumberU64()) + } + if frozen, err := ancient.db.Ancients(); err != nil || frozen != 1 { + t.Fatalf("failed to truncate ancient data") + } + ancient.terminateInsert = nil + if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + if ancient.CurrentFastBlock().NumberU64() != blocks[len(blocks)-1].NumberU64() { + t.Fatalf("failed to insert ancient recept chain after rollback") + } +} + // Tests that importing a very large side fork, which is larger than the canon chain, // but where the difficulty per block is kept low: this means that it will not // overtake the 'canon' chain until after it's passed canon by about 200 blocks. @@ -1764,7 +1937,7 @@ func testInsertKnownChainData(t *testing.T, typ string) { if err != nil { return err } - _, err = chain.InsertReceiptChain(blocks, receipts) + _, err = chain.InsertReceiptChain(blocks, receipts, 0) return err } asserter = func(t *testing.T, block *types.Block) { @@ -2019,14 +2192,12 @@ func BenchmarkBlockChain_1x1000ValueTransferToNonexisting(b *testing.B) { numTxs = 1000 numBlocks = 1 ) - recipientFn := func(nonce uint64) common.Address { return common.BigToAddress(big.NewInt(0).SetUint64(1337 + nonce)) } dataFn := func(nonce uint64) []byte { return nil } - benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn) } @@ -2044,7 +2215,6 @@ func BenchmarkBlockChain_1x1000ValueTransferToExisting(b *testing.B) { dataFn := func(nonce uint64) []byte { return nil } - benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn) } @@ -2062,6 +2232,5 @@ func BenchmarkBlockChain_1x1000Executions(b *testing.B) { dataFn := func(nonce uint64) []byte { return nil } - benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn) } diff --git a/core/headerchain.go b/core/headerchain.go index d0c1987fb..659141fd1 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -453,33 +453,56 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) { hc.currentHeaderHash = head.Hash() } -// DeleteCallback is a callback function that is called by SetHead before -// each header is deleted. -type DeleteCallback func(ethdb.Writer, common.Hash, uint64) +type ( + // UpdateHeadBlocksCallback is a callback function that is called by SetHead + // before head header is updated. + UpdateHeadBlocksCallback func(ethdb.KeyValueWriter, *types.Header) + + // DeleteBlockContentCallback is a callback function that is called by SetHead + // before each header is deleted. + DeleteBlockContentCallback func(ethdb.KeyValueWriter, common.Hash, uint64) +) // SetHead rewinds the local chain to a new head. Everything above the new head // will be deleted and the new one set. -func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) { - height := uint64(0) - - if hdr := hc.CurrentHeader(); hdr != nil { - height = hdr.Number.Uint64() - } - batch := hc.chainDb.NewBatch() +func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, delFn DeleteBlockContentCallback) { + var ( + parentHash common.Hash + batch = hc.chainDb.NewBatch() + ) for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() { - hash := hdr.Hash() - num := hdr.Number.Uint64() + hash, num := hdr.Hash(), hdr.Number.Uint64() + + // Rewind block chain to new head. + parent := hc.GetHeader(hdr.ParentHash, num-1) + if parent == nil { + parent = hc.genesisHeader + } + parentHash = hdr.ParentHash + // Notably, since geth has the possibility for setting the head to a low + // height which is even lower than ancient head. + // In order to ensure that the head is always no higher than the data in + // the database(ancient store or active store), we need to update head + // first then remove the relative data from the database. + // + // Update head first(head fast block, head full block) before deleting the data. + if updateFn != nil { + updateFn(hc.chainDb, parent) + } + // Update head header then. + rawdb.WriteHeadHeaderHash(hc.chainDb, parentHash) + + // Remove the relative data from the database. if delFn != nil { delFn(batch, hash, num) } + // Rewind header chain to new head. rawdb.DeleteHeader(batch, hash, num) rawdb.DeleteTd(batch, hash, num) + rawdb.DeleteCanonicalHash(batch, num) - hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1)) - } - // Roll back the canonical chain numbering - for i := height; i > head; i-- { - rawdb.DeleteCanonicalHash(batch, i) + hc.currentHeader.Store(parent) + hc.currentHeaderHash = parentHash } batch.Write() @@ -487,13 +510,6 @@ func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) { hc.headerCache.Purge() hc.tdCache.Purge() hc.numberCache.Purge() - - if hc.CurrentHeader() == nil { - hc.currentHeader.Store(hc.genesisHeader) - } - hc.currentHeaderHash = hc.CurrentHeader().Hash() - - rawdb.WriteHeadHeaderHash(hc.chainDb, hc.currentHeaderHash) } // SetGenesis sets a new genesis block header for the chain diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 103f18f78..681e6e917 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -30,10 +30,17 @@ import ( ) // ReadCanonicalHash retrieves the hash assigned to a canonical block number. -func ReadCanonicalHash(db ethdb.AncientReader, number uint64) common.Hash { - data, _ := db.Ancient("hashes", number) +func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash { + data, _ := db.Ancient(freezerHashTable, number) if len(data) == 0 { data, _ = db.Get(headerHashKey(number)) + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + if len(data) == 0 { + data, _ = db.Ancient(freezerHashTable, number) + } } if len(data) == 0 { return common.Hash{} @@ -42,29 +49,28 @@ func ReadCanonicalHash(db ethdb.AncientReader, number uint64) common.Hash { } // WriteCanonicalHash stores the hash assigned to a canonical block number. -func WriteCanonicalHash(db ethdb.Writer, hash common.Hash, number uint64) { +func WriteCanonicalHash(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { if err := db.Put(headerHashKey(number), hash.Bytes()); err != nil { log.Crit("Failed to store number to hash mapping", "err", err) } } // DeleteCanonicalHash removes the number to hash canonical mapping. -func DeleteCanonicalHash(db ethdb.Writer, number uint64) { +func DeleteCanonicalHash(db ethdb.KeyValueWriter, number uint64) { if err := db.Delete(headerHashKey(number)); err != nil { log.Crit("Failed to delete number to hash mapping", "err", err) } } -// readAllHashes retrieves all the hashes assigned to blocks at a certain heights, +// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights, // both canonical and reorged forks included. -// -// This method is a helper for the chain reader. It should never be exposed to the -// outside world. -func readAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { +func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { prefix := headerKeyPrefix(number) hashes := make([]common.Hash, 0, 1) it := db.NewIteratorWithPrefix(prefix) + defer it.Release() + for it.Next() { if key := it.Key(); len(key) == len(prefix)+32 { hashes = append(hashes, common.BytesToHash(key[len(key)-32:])) @@ -74,7 +80,7 @@ func readAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { } // ReadHeaderNumber returns the header number assigned to a hash. -func ReadHeaderNumber(db ethdb.Reader, hash common.Hash) *uint64 { +func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 { data, _ := db.Get(headerNumberKey(hash)) if len(data) != 8 { return nil @@ -83,8 +89,15 @@ func ReadHeaderNumber(db ethdb.Reader, hash common.Hash) *uint64 { return &number } +// DeleteHeaderNumber removes hash to number mapping. +func DeleteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Delete(headerNumberKey(hash)); err != nil { + log.Crit("Failed to delete hash to number mapping", "err", err) + } +} + // ReadHeadHeaderHash retrieves the hash of the current canonical head header. -func ReadHeadHeaderHash(db ethdb.Reader) common.Hash { +func ReadHeadHeaderHash(db ethdb.KeyValueReader) common.Hash { data, _ := db.Get(headHeaderKey) if len(data) == 0 { return common.Hash{} @@ -93,14 +106,14 @@ func ReadHeadHeaderHash(db ethdb.Reader) common.Hash { } // WriteHeadHeaderHash stores the hash of the current canonical head header. -func WriteHeadHeaderHash(db ethdb.Writer, hash common.Hash) { +func WriteHeadHeaderHash(db ethdb.KeyValueWriter, hash common.Hash) { if err := db.Put(headHeaderKey, hash.Bytes()); err != nil { log.Crit("Failed to store last header's hash", "err", err) } } // ReadHeadBlockHash retrieves the hash of the current canonical head block. -func ReadHeadBlockHash(db ethdb.Reader) common.Hash { +func ReadHeadBlockHash(db ethdb.KeyValueReader) common.Hash { data, _ := db.Get(headBlockKey) if len(data) == 0 { return common.Hash{} @@ -109,14 +122,14 @@ func ReadHeadBlockHash(db ethdb.Reader) common.Hash { } // WriteHeadBlockHash stores the head block's hash. -func WriteHeadBlockHash(db ethdb.Writer, hash common.Hash) { +func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { if err := db.Put(headBlockKey, hash.Bytes()); err != nil { log.Crit("Failed to store last block's hash", "err", err) } } // ReadHeadFastBlockHash retrieves the hash of the current fast-sync head block. -func ReadHeadFastBlockHash(db ethdb.Reader) common.Hash { +func ReadHeadFastBlockHash(db ethdb.KeyValueReader) common.Hash { data, _ := db.Get(headFastBlockKey) if len(data) == 0 { return common.Hash{} @@ -125,7 +138,7 @@ func ReadHeadFastBlockHash(db ethdb.Reader) common.Hash { } // WriteHeadFastBlockHash stores the hash of the current fast-sync head block. -func WriteHeadFastBlockHash(db ethdb.Writer, hash common.Hash) { +func WriteHeadFastBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { if err := db.Put(headFastBlockKey, hash.Bytes()); err != nil { log.Crit("Failed to store last fast block's hash", "err", err) } @@ -133,7 +146,7 @@ func WriteHeadFastBlockHash(db ethdb.Writer, hash common.Hash) { // ReadFastTrieProgress retrieves the number of tries nodes fast synced to allow // reporting correct numbers across restarts. -func ReadFastTrieProgress(db ethdb.Reader) uint64 { +func ReadFastTrieProgress(db ethdb.KeyValueReader) uint64 { data, _ := db.Get(fastTrieProgressKey) if len(data) == 0 { return 0 @@ -143,24 +156,31 @@ func ReadFastTrieProgress(db ethdb.Reader) uint64 { // WriteFastTrieProgress stores the fast sync trie process counter to support // retrieving it across restarts. -func WriteFastTrieProgress(db ethdb.Writer, count uint64) { +func WriteFastTrieProgress(db ethdb.KeyValueWriter, count uint64) { if err := db.Put(fastTrieProgressKey, new(big.Int).SetUint64(count).Bytes()); err != nil { log.Crit("Failed to store fast sync trie progress", "err", err) } } // ReadHeaderRLP retrieves a block header in its raw RLP database encoding. -func ReadHeaderRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Ancient("headers", number) +func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + data, _ := db.Ancient(freezerHeaderTable, number) if len(data) == 0 { data, _ = db.Get(headerKey(number, hash)) + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + if len(data) == 0 { + data, _ = db.Ancient(freezerHeaderTable, number) + } } return data } // HasHeader verifies the existence of a block header corresponding to the hash. -func HasHeader(db ethdb.AncientReader, hash common.Hash, number uint64) bool { - if has, err := db.Ancient("hashes", number); err == nil && common.BytesToHash(has) == hash { +func HasHeader(db ethdb.Reader, hash common.Hash, number uint64) bool { + if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash { return true } if has, err := db.Has(headerKey(number, hash)); !has || err != nil { @@ -170,7 +190,7 @@ func HasHeader(db ethdb.AncientReader, hash common.Hash, number uint64) bool { } // ReadHeader retrieves the block header corresponding to the hash. -func ReadHeader(db ethdb.AncientReader, hash common.Hash, number uint64) *types.Header { +func ReadHeader(db ethdb.Reader, hash common.Hash, number uint64) *types.Header { data := ReadHeaderRLP(db, hash, number) if len(data) == 0 { return nil @@ -185,7 +205,7 @@ func ReadHeader(db ethdb.AncientReader, hash common.Hash, number uint64) *types. // WriteHeader stores a block header into the database and also stores the hash- // to-number mapping. -func WriteHeader(db ethdb.Writer, header *types.Header) { +func WriteHeader(db ethdb.KeyValueWriter, header *types.Header) { // Write the hash -> number mapping var ( hash = header.Hash() @@ -208,7 +228,7 @@ func WriteHeader(db ethdb.Writer, header *types.Header) { } // DeleteHeader removes all block header data associated with a hash. -func DeleteHeader(db ethdb.Writer, hash common.Hash, number uint64) { +func DeleteHeader(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { deleteHeaderWithoutNumber(db, hash, number) if err := db.Delete(headerNumberKey(hash)); err != nil { log.Crit("Failed to delete hash to number mapping", "err", err) @@ -217,31 +237,38 @@ func DeleteHeader(db ethdb.Writer, hash common.Hash, number uint64) { // deleteHeaderWithoutNumber removes only the block header but does not remove // the hash to number mapping. -func deleteHeaderWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) { +func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { if err := db.Delete(headerKey(number, hash)); err != nil { log.Crit("Failed to delete header", "err", err) } } // ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding. -func ReadBodyRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Ancient("bodies", number) +func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + data, _ := db.Ancient(freezerBodiesTable, number) if len(data) == 0 { data, _ = db.Get(blockBodyKey(number, hash)) + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + if len(data) == 0 { + data, _ = db.Ancient(freezerBodiesTable, number) + } } return data } // WriteBodyRLP stores an RLP encoded block body into the database. -func WriteBodyRLP(db ethdb.Writer, hash common.Hash, number uint64, rlp rlp.RawValue) { +func WriteBodyRLP(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rlp rlp.RawValue) { if err := db.Put(blockBodyKey(number, hash), rlp); err != nil { log.Crit("Failed to store block body", "err", err) } } // HasBody verifies the existence of a block body corresponding to the hash. -func HasBody(db ethdb.AncientReader, hash common.Hash, number uint64) bool { - if has, err := db.Ancient("hashes", number); err == nil && common.BytesToHash(has) == hash { +func HasBody(db ethdb.Reader, hash common.Hash, number uint64) bool { + if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash { return true } if has, err := db.Has(blockBodyKey(number, hash)); !has || err != nil { @@ -251,7 +278,7 @@ func HasBody(db ethdb.AncientReader, hash common.Hash, number uint64) bool { } // ReadBody retrieves the block body corresponding to the hash. -func ReadBody(db ethdb.AncientReader, hash common.Hash, number uint64) *types.Body { +func ReadBody(db ethdb.Reader, hash common.Hash, number uint64) *types.Body { data := ReadBodyRLP(db, hash, number) if len(data) == 0 { return nil @@ -265,7 +292,7 @@ func ReadBody(db ethdb.AncientReader, hash common.Hash, number uint64) *types.Bo } // WriteBody stores a block body into the database. -func WriteBody(db ethdb.Writer, hash common.Hash, number uint64, body *types.Body) { +func WriteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64, body *types.Body) { data, err := rlp.EncodeToBytes(body) if err != nil { log.Crit("Failed to RLP encode body", "err", err) @@ -274,23 +301,30 @@ func WriteBody(db ethdb.Writer, hash common.Hash, number uint64, body *types.Bod } // DeleteBody removes all block body data associated with a hash. -func DeleteBody(db ethdb.Writer, hash common.Hash, number uint64) { +func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { if err := db.Delete(blockBodyKey(number, hash)); err != nil { log.Crit("Failed to delete block body", "err", err) } } // ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding. -func ReadTdRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Ancient("diffs", number) +func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + data, _ := db.Ancient(freezerDifficultyTable, number) if len(data) == 0 { data, _ = db.Get(headerTDKey(number, hash)) + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + if len(data) == 0 { + data, _ = db.Ancient(freezerDifficultyTable, number) + } } return data } // ReadTd retrieves a block's total difficulty corresponding to the hash. -func ReadTd(db ethdb.AncientReader, hash common.Hash, number uint64) *big.Int { +func ReadTd(db ethdb.Reader, hash common.Hash, number uint64) *big.Int { data := ReadTdRLP(db, hash, number) if len(data) == 0 { return nil @@ -304,7 +338,7 @@ func ReadTd(db ethdb.AncientReader, hash common.Hash, number uint64) *big.Int { } // WriteTd stores the total difficulty of a block into the database. -func WriteTd(db ethdb.Writer, hash common.Hash, number uint64, td *big.Int) { +func WriteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64, td *big.Int) { data, err := rlp.EncodeToBytes(td) if err != nil { log.Crit("Failed to RLP encode block total difficulty", "err", err) @@ -315,7 +349,7 @@ func WriteTd(db ethdb.Writer, hash common.Hash, number uint64, td *big.Int) { } // DeleteTd removes all block total difficulty data associated with a hash. -func DeleteTd(db ethdb.Writer, hash common.Hash, number uint64) { +func DeleteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { if err := db.Delete(headerTDKey(number, hash)); err != nil { log.Crit("Failed to delete block total difficulty", "err", err) } @@ -323,8 +357,8 @@ func DeleteTd(db ethdb.Writer, hash common.Hash, number uint64) { // HasReceipts verifies the existence of all the transaction receipts belonging // to a block. -func HasReceipts(db ethdb.AncientReader, hash common.Hash, number uint64) bool { - if has, err := db.Ancient("hashes", number); err == nil && common.BytesToHash(has) == hash { +func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool { + if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash { return true } if has, err := db.Has(blockReceiptsKey(number, hash)); !has || err != nil { @@ -334,10 +368,17 @@ func HasReceipts(db ethdb.AncientReader, hash common.Hash, number uint64) bool { } // ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding. -func ReadReceiptsRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Ancient("receipts", number) +func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + data, _ := db.Ancient(freezerReceiptTable, number) if len(data) == 0 { data, _ = db.Get(blockReceiptsKey(number, hash)) + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + if len(data) == 0 { + data, _ = db.Ancient(freezerReceiptTable, number) + } } return data } @@ -345,7 +386,7 @@ func ReadReceiptsRLP(db ethdb.AncientReader, hash common.Hash, number uint64) rl // ReadRawReceipts retrieves all the transaction receipts belonging to a block. // The receipt metadata fields are not guaranteed to be populated, so they // should not be used. Use ReadReceipts instead if the metadata is needed. -func ReadRawReceipts(db ethdb.AncientReader, hash common.Hash, number uint64) types.Receipts { +func ReadRawReceipts(db ethdb.Reader, hash common.Hash, number uint64) types.Receipts { // Retrieve the flattened receipt slice data := ReadReceiptsRLP(db, hash, number) if len(data) == 0 { @@ -371,7 +412,7 @@ func ReadRawReceipts(db ethdb.AncientReader, hash common.Hash, number uint64) ty // The current implementation populates these metadata fields by reading the receipts' // corresponding block body, so if the block body is not found it will return nil even // if the receipt itself is stored. -func ReadReceipts(db ethdb.AncientReader, hash common.Hash, number uint64, config *params.ChainConfig) types.Receipts { +func ReadReceipts(db ethdb.Reader, hash common.Hash, number uint64, config *params.ChainConfig) types.Receipts { // We're deriving many fields from the block body, retrieve beside the receipt receipts := ReadRawReceipts(db, hash, number) if receipts == nil { @@ -390,7 +431,7 @@ func ReadReceipts(db ethdb.AncientReader, hash common.Hash, number uint64, confi } // WriteReceipts stores all the transaction receipts belonging to a block. -func WriteReceipts(db ethdb.Writer, hash common.Hash, number uint64, receipts types.Receipts) { +func WriteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64, receipts types.Receipts) { // Convert the receipts into their storage form and serialize them storageReceipts := make([]*types.ReceiptForStorage, len(receipts)) for i, receipt := range receipts { @@ -407,7 +448,7 @@ func WriteReceipts(db ethdb.Writer, hash common.Hash, number uint64, receipts ty } // DeleteReceipts removes all receipt data associated with a block hash. -func DeleteReceipts(db ethdb.Writer, hash common.Hash, number uint64) { +func DeleteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { if err := db.Delete(blockReceiptsKey(number, hash)); err != nil { log.Crit("Failed to delete block receipts", "err", err) } @@ -419,7 +460,7 @@ func DeleteReceipts(db ethdb.Writer, hash common.Hash, number uint64) { // // Note, due to concurrent download of header and block body the header and thus // canonical hash can be stored in the database but the body data not (yet). -func ReadBlock(db ethdb.AncientReader, hash common.Hash, number uint64) *types.Block { +func ReadBlock(db ethdb.Reader, hash common.Hash, number uint64) *types.Block { header := ReadHeader(db, hash, number) if header == nil { return nil @@ -432,22 +473,53 @@ func ReadBlock(db ethdb.AncientReader, hash common.Hash, number uint64) *types.B } // WriteBlock serializes a block into the database, header and body separately. -func WriteBlock(db ethdb.Writer, block *types.Block) { +func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) { WriteBody(db, block.Hash(), block.NumberU64(), block.Body()) WriteHeader(db, block.Header()) } +// WriteAncientBlock writes entire block data into ancient store and returns the total written size. +func WriteAncientBlock(db ethdb.AncientWriter, block *types.Block, receipts types.Receipts, td *big.Int) int { + // Encode all block components to RLP format. + headerBlob, err := rlp.EncodeToBytes(block.Header()) + if err != nil { + log.Crit("Failed to RLP encode block header", "err", err) + } + bodyBlob, err := rlp.EncodeToBytes(block.Body()) + if err != nil { + log.Crit("Failed to RLP encode body", "err", err) + } + storageReceipts := make([]*types.ReceiptForStorage, len(receipts)) + for i, receipt := range receipts { + storageReceipts[i] = (*types.ReceiptForStorage)(receipt) + } + receiptBlob, err := rlp.EncodeToBytes(storageReceipts) + if err != nil { + log.Crit("Failed to RLP encode block receipts", "err", err) + } + tdBlob, err := rlp.EncodeToBytes(td) + if err != nil { + log.Crit("Failed to RLP encode block total difficulty", "err", err) + } + // Write all blob to flatten files. + err = db.AppendAncient(block.NumberU64(), block.Hash().Bytes(), headerBlob, bodyBlob, receiptBlob, tdBlob) + if err != nil { + log.Crit("Failed to write block data to ancient store", "err", err) + } + return len(headerBlob) + len(bodyBlob) + len(receiptBlob) + len(tdBlob) + common.HashLength +} + // DeleteBlock removes all block data associated with a hash. -func DeleteBlock(db ethdb.Writer, hash common.Hash, number uint64) { +func DeleteBlock(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { DeleteReceipts(db, hash, number) DeleteHeader(db, hash, number) DeleteBody(db, hash, number) DeleteTd(db, hash, number) } -// deleteBlockWithoutNumber removes all block data associated with a hash, except +// DeleteBlockWithoutNumber removes all block data associated with a hash, except // the hash to number mapping. -func deleteBlockWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) { +func DeleteBlockWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { DeleteReceipts(db, hash, number) deleteHeaderWithoutNumber(db, hash, number) DeleteBody(db, hash, number) @@ -455,7 +527,7 @@ func deleteBlockWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) } // FindCommonAncestor returns the last common ancestor of two block headers -func FindCommonAncestor(db ethdb.AncientReader, a, b *types.Header) *types.Header { +func FindCommonAncestor(db ethdb.Reader, a, b *types.Header) *types.Header { for bn := b.Number.Uint64(); a.Number.Uint64() > bn; { a = ReadHeader(db, a.ParentHash, a.Number.Uint64()-1) if a == nil { diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index 666e3edff..ed1f1bca6 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -54,7 +54,7 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 { // WriteTxLookupEntries stores a positional metadata for every transaction from // a block, enabling hash based transaction and receipt lookups. -func WriteTxLookupEntries(db ethdb.Writer, block *types.Block) { +func WriteTxLookupEntries(db ethdb.KeyValueWriter, block *types.Block) { for _, tx := range block.Transactions() { if err := db.Put(txLookupKey(tx.Hash()), block.Number().Bytes()); err != nil { log.Crit("Failed to store transaction lookup entry", "err", err) @@ -63,13 +63,13 @@ func WriteTxLookupEntries(db ethdb.Writer, block *types.Block) { } // DeleteTxLookupEntry removes all transaction data associated with a hash. -func DeleteTxLookupEntry(db ethdb.Writer, hash common.Hash) { +func DeleteTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash) { db.Delete(txLookupKey(hash)) } // ReadTransaction retrieves a specific transaction from the database, along with // its added positional metadata. -func ReadTransaction(db ethdb.AncientReader, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) { +func ReadTransaction(db ethdb.Reader, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) { blockNumber := ReadTxLookupEntry(db, hash) if blockNumber == nil { return nil, common.Hash{}, 0, 0 @@ -94,7 +94,7 @@ func ReadTransaction(db ethdb.AncientReader, hash common.Hash) (*types.Transacti // ReadReceipt retrieves a specific transaction receipt from the database, along with // its added positional metadata. -func ReadReceipt(db ethdb.AncientReader, hash common.Hash, config *params.ChainConfig) (*types.Receipt, common.Hash, uint64, uint64) { +func ReadReceipt(db ethdb.Reader, hash common.Hash, config *params.ChainConfig) (*types.Receipt, common.Hash, uint64, uint64) { // Retrieve the context of the receipt based on the transaction hash blockNumber := ReadTxLookupEntry(db, hash) if blockNumber == nil { @@ -117,13 +117,13 @@ func ReadReceipt(db ethdb.AncientReader, hash common.Hash, config *params.ChainC // ReadBloomBits retrieves the compressed bloom bit vector belonging to the given // section and bit index from the. -func ReadBloomBits(db ethdb.Reader, bit uint, section uint64, head common.Hash) ([]byte, error) { +func ReadBloomBits(db ethdb.KeyValueReader, bit uint, section uint64, head common.Hash) ([]byte, error) { return db.Get(bloomBitsKey(bit, section, head)) } // WriteBloomBits stores the compressed bloom bits vector belonging to the given // section and bit index. -func WriteBloomBits(db ethdb.Writer, bit uint, section uint64, head common.Hash, bits []byte) { +func WriteBloomBits(db ethdb.KeyValueWriter, bit uint, section uint64, head common.Hash, bits []byte) { if err := db.Put(bloomBitsKey(bit, section, head), bits); err != nil { log.Crit("Failed to store bloom bits", "err", err) } diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go index 1361b0d73..f8d09fbdd 100644 --- a/core/rawdb/accessors_metadata.go +++ b/core/rawdb/accessors_metadata.go @@ -27,7 +27,7 @@ import ( ) // ReadDatabaseVersion retrieves the version number of the database. -func ReadDatabaseVersion(db ethdb.Reader) *uint64 { +func ReadDatabaseVersion(db ethdb.KeyValueReader) *uint64 { var version uint64 enc, _ := db.Get(databaseVerisionKey) @@ -42,7 +42,7 @@ func ReadDatabaseVersion(db ethdb.Reader) *uint64 { } // WriteDatabaseVersion stores the version number of the database -func WriteDatabaseVersion(db ethdb.Writer, version uint64) { +func WriteDatabaseVersion(db ethdb.KeyValueWriter, version uint64) { enc, err := rlp.EncodeToBytes(version) if err != nil { log.Crit("Failed to encode database version", "err", err) @@ -53,7 +53,7 @@ func WriteDatabaseVersion(db ethdb.Writer, version uint64) { } // ReadChainConfig retrieves the consensus settings based on the given genesis hash. -func ReadChainConfig(db ethdb.Reader, hash common.Hash) *params.ChainConfig { +func ReadChainConfig(db ethdb.KeyValueReader, hash common.Hash) *params.ChainConfig { data, _ := db.Get(configKey(hash)) if len(data) == 0 { return nil @@ -67,7 +67,7 @@ func ReadChainConfig(db ethdb.Reader, hash common.Hash) *params.ChainConfig { } // WriteChainConfig writes the chain config settings to the database. -func WriteChainConfig(db ethdb.Writer, hash common.Hash, cfg *params.ChainConfig) { +func WriteChainConfig(db ethdb.KeyValueWriter, hash common.Hash, cfg *params.ChainConfig) { if cfg == nil { return } @@ -81,13 +81,13 @@ func WriteChainConfig(db ethdb.Writer, hash common.Hash, cfg *params.ChainConfig } // ReadPreimage retrieves a single preimage of the provided hash. -func ReadPreimage(db ethdb.Reader, hash common.Hash) []byte { +func ReadPreimage(db ethdb.KeyValueReader, hash common.Hash) []byte { data, _ := db.Get(preimageKey(hash)) return data } // WritePreimages writes the provided set of preimages to the database. -func WritePreimages(db ethdb.Writer, preimages map[common.Hash][]byte) { +func WritePreimages(db ethdb.KeyValueWriter, preimages map[common.Hash][]byte) { for hash, preimage := range preimages { if err := db.Put(preimageKey(hash), preimage); err != nil { log.Crit("Failed to store trie preimage", "err", err) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index cd1048cbc..5a3c7f94b 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -24,7 +24,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb/memorydb" ) -// freezerdb is a databse wrapper that enabled freezer data retrievals. +// freezerdb is a database wrapper that enabled freezer data retrievals. type freezerdb struct { ethdb.KeyValueStore ethdb.AncientStore @@ -51,9 +51,34 @@ type nofreezedb struct { ethdb.KeyValueStore } -// Frozen returns nil as we don't have a backing chain freezer. +// HasAncient returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) HasAncient(kind string, number uint64) (bool, error) { + return false, errNotSupported +} + +// Ancient returns an error as we don't have a backing chain freezer. func (db *nofreezedb) Ancient(kind string, number uint64) ([]byte, error) { - return nil, errOutOfBounds + return nil, errNotSupported +} + +// Ancients returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) Ancients() (uint64, error) { + return 0, errNotSupported +} + +// AppendAncient returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { + return errNotSupported +} + +// TruncateAncients returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) TruncateAncients(items uint64) error { + return errNotSupported +} + +// Sync returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) Sync() error { + return errNotSupported } // NewDatabase creates a high level database on top of a given key-value data diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 07df4c759..84426d8ae 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -31,9 +31,15 @@ import ( "github.com/prometheus/tsdb/fileutil" ) -// errUnknownTable is returned if the user attempts to read from a table that is -// not tracked by the freezer. -var errUnknownTable = errors.New("unknown table") +var ( + // errUnknownTable is returned if the user attempts to read from a table that is + // not tracked by the freezer. + errUnknownTable = errors.New("unknown table") + + // errOutOrderInsertion is returned if the user attempts to inject out-of-order + // binary blobs into the freezer. + errOutOrderInsertion = errors.New("the append operation is out-order") +) const ( // freezerRecheckInterval is the frequency to check the key-value database for @@ -44,7 +50,7 @@ const ( // freezerBlockGraduation is the number of confirmations a block must achieve // before it becomes elligible for chain freezing. This must exceed any chain // reorg depth, since the freezer also deletes all block siblings. - freezerBlockGraduation = 60000 + freezerBlockGraduation = 90000 // freezerBatchLimit is the maximum number of blocks to freeze in one batch // before doing an fsync and deleting it from the key-value store. @@ -72,7 +78,9 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) ) - lock, _, err := fileutil.Flock(filepath.Join(datadir, "LOCK")) + // Leveldb uses LOCK as the filelock filename. To prevent the + // name collision, we use FLOCK as the lock name. + lock, _, err := fileutil.Flock(filepath.Join(datadir, "FLOCK")) if err != nil { return nil, err } @@ -81,7 +89,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { tables: make(map[string]*freezerTable), instanceLock: lock, } - for _, name := range []string{"hashes", "headers", "bodies", "receipts", "diffs"} { + for _, name := range []string{freezerHashTable, freezerHeaderTable, freezerBodiesTable, freezerReceiptTable, freezerDifficultyTable} { table, err := newTable(datadir, name, readMeter, writeMeter) if err != nil { for _, table := range freezer.tables { @@ -92,21 +100,12 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { } freezer.tables[name] = table } - // Truncate all data tables to the same length - freezer.frozen = math.MaxUint64 - for _, table := range freezer.tables { - if freezer.frozen > table.items { - freezer.frozen = table.items - } - } - for _, table := range freezer.tables { - if err := table.truncate(freezer.frozen); err != nil { - for _, table := range freezer.tables { - table.Close() - } - lock.Release() - return nil, err + if err := freezer.repair(); err != nil { + for _, table := range freezer.tables { + table.Close() } + lock.Release() + return nil, err } return freezer, nil } @@ -128,8 +127,91 @@ func (f *freezer) Close() error { return nil } +// HasAncient returns an indicator whether the specified ancient data exists +// in the freezer. +func (f *freezer) HasAncient(kind string, number uint64) (bool, error) { + if table := f.tables[kind]; table != nil { + return table.has(number), nil + } + return false, nil +} + +// Ancient retrieves an ancient binary blob from the append-only immutable files. +func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) { + if table := f.tables[kind]; table != nil { + return table.Retrieve(number) + } + return nil, errUnknownTable +} + +// Ancients returns the length of the frozen items. +func (f *freezer) Ancients() (uint64, error) { + return atomic.LoadUint64(&f.frozen), nil +} + +// AppendAncient injects all binary blobs belong to block at the end of the +// append-only immutable table files. +// +// Notably, this function is lock free but kind of thread-safe. All out-of-order +// injection will be rejected. But if two injections with same number happen at +// the same time, we can get into the trouble. +func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td []byte) (err error) { + // Ensure the binary blobs we are appending is continuous with freezer. + if atomic.LoadUint64(&f.frozen) != number { + return errOutOrderInsertion + } + // Rollback all inserted data if any insertion below failed to ensure + // the tables won't out of sync. + defer func() { + if err != nil { + rerr := f.repair() + if rerr != nil { + log.Crit("Failed to repair freezer", "err", rerr) + } + log.Info("Append ancient failed", "number", number, "err", err) + } + }() + // Inject all the components into the relevant data tables + if err := f.tables[freezerHashTable].Append(f.frozen, hash[:]); err != nil { + log.Error("Failed to append ancient hash", "number", f.frozen, "hash", hash, "err", err) + return err + } + if err := f.tables[freezerHeaderTable].Append(f.frozen, header); err != nil { + log.Error("Failed to append ancient header", "number", f.frozen, "hash", hash, "err", err) + return err + } + if err := f.tables[freezerBodiesTable].Append(f.frozen, body); err != nil { + log.Error("Failed to append ancient body", "number", f.frozen, "hash", hash, "err", err) + return err + } + if err := f.tables[freezerReceiptTable].Append(f.frozen, receipts); err != nil { + log.Error("Failed to append ancient receipts", "number", f.frozen, "hash", hash, "err", err) + return err + } + if err := f.tables[freezerDifficultyTable].Append(f.frozen, td); err != nil { + log.Error("Failed to append ancient difficulty", "number", f.frozen, "hash", hash, "err", err) + return err + } + atomic.AddUint64(&f.frozen, 1) // Only modify atomically + return nil +} + +// Truncate discards any recent data above the provided threshold number. +func (f *freezer) TruncateAncients(items uint64) error { + if atomic.LoadUint64(&f.frozen) <= items { + return nil + } + for _, table := range f.tables { + if err := table.truncate(items); err != nil { + return err + } + } + atomic.StoreUint64(&f.frozen, items) + return nil +} + // sync flushes all data tables to disk. -func (f *freezer) sync() error { +func (f *freezer) Sync() error { var errs []error for _, table := range f.tables { if err := table.Sync(); err != nil { @@ -142,14 +224,6 @@ func (f *freezer) sync() error { return nil } -// Ancient retrieves an ancient binary blob from the append-only immutable files. -func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) { - if table := f.tables[kind]; table != nil { - return table.Retrieve(number) - } - return nil, errUnknownTable -} - // freeze is a background thread that periodically checks the blockchain for any // import progress and moves ancient data from the fast database into the freezer. // @@ -159,25 +233,22 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { nfdb := &nofreezedb{KeyValueStore: db} for { - // Retrieve the freezing threshold. In theory we're interested only in full - // blocks post-sync, but that would keep the live database enormous during - // dast sync. By picking the fast block, we still get to deep freeze all the - // final immutable data without having to wait for sync to finish. - hash := ReadHeadFastBlockHash(nfdb) + // Retrieve the freezing threshold. + hash := ReadHeadBlockHash(nfdb) if hash == (common.Hash{}) { - log.Debug("Current fast block hash unavailable") // new chain, empty database + log.Debug("Current full block hash unavailable") // new chain, empty database time.Sleep(freezerRecheckInterval) continue } number := ReadHeaderNumber(nfdb, hash) switch { case number == nil: - log.Error("Current fast block number unavailable", "hash", hash) + log.Error("Current full block number unavailable", "hash", hash) time.Sleep(freezerRecheckInterval) continue case *number < freezerBlockGraduation: - log.Debug("Current fast block not old enough", "number", *number, "hash", hash, "delay", freezerBlockGraduation) + log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", freezerBlockGraduation) time.Sleep(freezerRecheckInterval) continue @@ -188,7 +259,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { } head := ReadHeader(nfdb, hash, *number) if head == nil { - log.Error("Current fast block unavailable", "number", *number, "hash", hash) + log.Error("Current full block unavailable", "number", *number, "hash", hash) time.Sleep(freezerRecheckInterval) continue } @@ -229,48 +300,35 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { log.Error("Total difficulty missing, can't freeze", "number", f.frozen, "hash", hash) break } - // Inject all the components into the relevant data tables - if err := f.tables["hashes"].Append(f.frozen, hash[:]); err != nil { - log.Error("Failed to deep freeze hash", "number", f.frozen, "hash", hash, "err", err) - break - } - if err := f.tables["headers"].Append(f.frozen, header); err != nil { - log.Error("Failed to deep freeze header", "number", f.frozen, "hash", hash, "err", err) - break - } - if err := f.tables["bodies"].Append(f.frozen, body); err != nil { - log.Error("Failed to deep freeze body", "number", f.frozen, "hash", hash, "err", err) - break - } - if err := f.tables["receipts"].Append(f.frozen, receipts); err != nil { - log.Error("Failed to deep freeze receipts", "number", f.frozen, "hash", hash, "err", err) - break - } - if err := f.tables["diffs"].Append(f.frozen, td); err != nil { - log.Error("Failed to deep freeze difficulty", "number", f.frozen, "hash", hash, "err", err) - break - } log.Trace("Deep froze ancient block", "number", f.frozen, "hash", hash) - atomic.AddUint64(&f.frozen, 1) // Only modify atomically + // Inject all the components into the relevant data tables + if err := f.AppendAncient(f.frozen, hash[:], header, body, receipts, td); err != nil { + break + } ancients = append(ancients, hash) } // Batch of blocks have been frozen, flush them before wiping from leveldb - if err := f.sync(); err != nil { + if err := f.Sync(); err != nil { log.Crit("Failed to flush frozen tables", "err", err) } // Wipe out all data from the active database batch := db.NewBatch() + for i := 0; i < len(ancients); i++ { + DeleteBlockWithoutNumber(batch, ancients[i], first+uint64(i)) + DeleteCanonicalHash(batch, first+uint64(i)) + } + if err := batch.Write(); err != nil { + log.Crit("Failed to delete frozen canonical blocks", "err", err) + } + batch.Reset() + // Wipe out side chain also. for number := first; number < f.frozen; number++ { - for _, hash := range readAllHashes(db, number) { - if hash == ancients[number-first] { - deleteBlockWithoutNumber(batch, hash, number) - } else { - DeleteBlock(batch, hash, number) - } + for _, hash := range ReadAllHashes(db, number) { + DeleteBlock(batch, hash, number) } } if err := batch.Write(); err != nil { - log.Crit("Failed to delete frozen items", "err", err) + log.Crit("Failed to delete frozen side blocks", "err", err) } // Log something friendly for the user context := []interface{}{ @@ -287,3 +345,21 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { } } } + +// repair truncates all data tables to the same length. +func (f *freezer) repair() error { + min := uint64(math.MaxUint64) + for _, table := range f.tables { + items := atomic.LoadUint64(&table.items) + if min > items { + min = items + } + } + for _, table := range f.tables { + if err := table.truncate(min); err != nil { + return err + } + } + atomic.StoreUint64(&f.frozen, min) + return nil +} diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 313ac8b78..8e117301b 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -39,6 +39,9 @@ var ( // errOutOfBounds is returned if the item requested is not contained within the // freezer table. errOutOfBounds = errors.New("out of bounds") + + // errNotSupported is returned if the database doesn't support the required operation. + errNotSupported = errors.New("this operation is not supported") ) // indexEntry contains the number/id of the file that the data resides in, aswell as the @@ -451,7 +454,6 @@ func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { // Retrieve looks up the data offset of an item with the given number and retrieves // the raw binary blob from the data file. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { - // Ensure the table and the item is accessible if t.index == nil || t.head == nil { return nil, errClosed @@ -483,6 +485,12 @@ func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { return snappy.Decode(nil, blob) } +// has returns an indicator whether the specified number data +// exists in the freezer table. +func (t *freezerTable) has(number uint64) bool { + return atomic.LoadUint64(&t.items) > number +} + // Sync pushes any pending data from memory out to disk. This is an expensive // operation, so use it with care. func (t *freezerTable) Sync() error { diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 62b60e2f3..ebca17252 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -63,6 +63,23 @@ var ( preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil) ) +const ( + // freezerHeaderTable indicates the name of the freezer header table. + freezerHeaderTable = "headers" + + // freezerHashTable indicates the name of the freezer canonical hash table. + freezerHashTable = "hashes" + + // freezerBodiesTable indicates the name of the freezer block body table. + freezerBodiesTable = "bodies" + + // freezerReceiptTable indicates the name of the freezer receipts table. + freezerReceiptTable = "receipts" + + // freezerDifficultyTable indicates the name of the freezer total difficulty table. + freezerDifficultyTable = "diffs" +) + // LegacyTxLookupEntry is the legacy TxLookupEntry definition with some unnecessary // fields. type LegacyTxLookupEntry struct { diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 0b5e08b20..124678959 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -50,12 +50,42 @@ func (t *table) Get(key []byte) ([]byte, error) { return t.db.Get(append([]byte(t.prefix), key...)) } +// HasAncient is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) HasAncient(kind string, number uint64) (bool, error) { + return t.db.HasAncient(kind, number) +} + // Ancient is a noop passthrough that just forwards the request to the underlying // database. func (t *table) Ancient(kind string, number uint64) ([]byte, error) { return t.db.Ancient(kind, number) } +// Ancients is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) Ancients() (uint64, error) { + return t.db.Ancients() +} + +// AppendAncient is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { + return t.db.AppendAncient(number, hash, header, body, receipts, td) +} + +// TruncateAncients is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) TruncateAncients(items uint64) error { + return t.db.TruncateAncients(items) +} + +// Sync is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) Sync() error { + return t.db.Sync() +} + // Put inserts the given value into the database at a prefixed version of the // provided key. func (t *table) Put(key []byte, value []byte) error { @@ -163,6 +193,6 @@ func (b *tableBatch) Reset() { } // Replay replays the batch contents. -func (b *tableBatch) Replay(w ethdb.Writer) error { +func (b *tableBatch) Replay(w ethdb.KeyValueWriter) error { return b.batch.Replay(w) } diff --git a/core/state/database.go b/core/state/database.go index 8798b7380..ecc2c134d 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -93,7 +93,7 @@ type Trie interface { // If the trie does not contain a value for key, the returned proof contains all // nodes of the longest existing prefix of the key (at least the root), ending // with the node that proves the absence of the key. - Prove(key []byte, fromLevel uint, proofDb ethdb.Writer) error + Prove(key []byte, fromLevel uint, proofDb ethdb.KeyValueWriter) error } // NewDatabase creates a backing store for state. The returned database is safe for diff --git a/core/state/sync.go b/core/state/sync.go index e4a08d293..ef7930527 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -26,7 +26,7 @@ import ( ) // NewStateSync create a new state trie download scheduler. -func NewStateSync(root common.Hash, database ethdb.Reader, bloom *trie.SyncBloom) *trie.Sync { +func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.SyncBloom) *trie.Sync { var syncer *trie.Sync callback := func(leaf []byte, parent common.Hash) error { var obj Account diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 0e19fe9e6..79107c8d1 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -129,6 +129,7 @@ type Downloader struct { synchronising int32 notified int32 committed int32 + ancientLimit uint64 // The maximum block number which can be regarded as ancient data. // Channels headerCh chan dataPack // [eth/62] Channel receiving inbound block headers @@ -206,7 +207,7 @@ type BlockChain interface { InsertChain(types.Blocks) (int, error) // InsertReceiptChain inserts a batch of receipts into the local chain. - InsertReceiptChain(types.Blocks, []types.Receipts) (int, error) + InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error) } // New creates a new downloader to fetch hashes and blocks from remote peers. @@ -475,12 +476,49 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I if d.mode == FastSync && pivot != 0 { d.committed = 0 } + if d.mode == FastSync { + // Set the ancient data limitation. + // If we are running fast sync, all block data not greater than ancientLimit will + // be written to the ancient store. Otherwise, block data will be written to active + // database and then wait freezer to migrate. + // + // If there is checkpoint available, then calculate the ancientLimit through + // checkpoint. Otherwise calculate the ancient limit through the advertised + // height by remote peer. + // + // The reason for picking checkpoint first is: there exists an attack vector + // for height that: a malicious peer can give us a fake(very high) height, + // so that the ancient limit is also very high. And then the peer start to + // feed us valid blocks until head. All of these blocks might be written into + // the ancient store, the safe region for freezer is not enough. + if d.checkpoint != 0 && d.checkpoint > MaxForkAncestry+1 { + d.ancientLimit = height - MaxForkAncestry - 1 + } else if height > MaxForkAncestry+1 { + d.ancientLimit = height - MaxForkAncestry - 1 + } + frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. + // If a part of blockchain data has already been written into active store, + // disable the ancient style insertion explicitly. + if origin >= frozen && frozen != 0 { + d.ancientLimit = 0 + log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1) + } else if d.ancientLimit > 0 { + log.Debug("Enabling direct-ancient mode", "ancient", d.ancientLimit) + } + // Rewind the ancient store and blockchain if reorg happens. + if origin+1 < frozen { + var hashes []common.Hash + for i := origin + 1; i < d.lightchain.CurrentHeader().Number.Uint64(); i++ { + hashes = append(hashes, rawdb.ReadCanonicalHash(d.stateDB, i)) + } + d.lightchain.Rollback(hashes) + } + } // Initiate the sync using a concurrent header and content retrieval algorithm d.queue.Prepare(origin+1, d.mode) if d.syncInitHook != nil { d.syncInitHook(origin, height) } - fetchers := []func() error{ func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync @@ -544,6 +582,9 @@ func (d *Downloader) cancel() { func (d *Downloader) Cancel() { d.cancel() d.cancelWg.Wait() + + d.ancientLimit = 0 + log.Debug("Reset ancient limit to zero") } // Terminate interrupts the downloader, canceling all pending operations. @@ -1315,7 +1356,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv // queue until the stream ends or a failure occurs. func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { // Keep a count of uncertain headers to roll back - rollback := []*types.Header{} + var rollback []*types.Header defer func() { if len(rollback) > 0 { // Flatten the headers and roll them back @@ -1409,11 +1450,10 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er limit = len(headers) } chunk := headers[:limit] - // In case of header only syncing, validate the chunk immediately if d.mode == FastSync || d.mode == LightSync { // Collect the yet unknown headers to mark them as uncertain - unknown := make([]*types.Header, 0, len(headers)) + unknown := make([]*types.Header, 0, len(chunk)) for _, header := range chunk { if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) { unknown = append(unknown, header) @@ -1663,7 +1703,7 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) receipts[i] = result.Receipts } - if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil { + if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) return errInvalidChain } @@ -1675,7 +1715,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error { log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash()) // Commit the pivot block as the new head, will require full sync from here on - if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil { + if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}, d.ancientLimit); err != nil { return err } if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index ac7f48abd..659aee088 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -57,6 +57,11 @@ type downloadTester struct { ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain + ancientHeaders map[common.Hash]*types.Header // Ancient headers belonging to the tester + ancientBlocks map[common.Hash]*types.Block // Ancient blocks belonging to the tester + ancientReceipts map[common.Hash]types.Receipts // Ancient receipts belonging to the tester + ancientChainTd map[common.Hash]*big.Int // Ancient total difficulties of the blocks in the local chain + lock sync.RWMutex } @@ -71,6 +76,12 @@ func newTester() *downloadTester { ownBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis}, ownReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil}, ownChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()}, + + // Initialize ancient store with test genesis block + ancientHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()}, + ancientBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis}, + ancientReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil}, + ancientChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()}, } tester.stateDb = rawdb.NewMemoryDatabase() tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00}) @@ -122,6 +133,9 @@ func (dl *downloadTester) HasFastBlock(hash common.Hash, number uint64) bool { dl.lock.RLock() defer dl.lock.RUnlock() + if _, ok := dl.ancientReceipts[hash]; ok { + return true + } _, ok := dl.ownReceipts[hash] return ok } @@ -131,6 +145,10 @@ func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header { dl.lock.RLock() defer dl.lock.RUnlock() + header := dl.ancientHeaders[hash] + if header != nil { + return header + } return dl.ownHeaders[hash] } @@ -139,6 +157,10 @@ func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block { dl.lock.RLock() defer dl.lock.RUnlock() + block := dl.ancientBlocks[hash] + if block != nil { + return block + } return dl.ownBlocks[hash] } @@ -148,6 +170,9 @@ func (dl *downloadTester) CurrentHeader() *types.Header { defer dl.lock.RUnlock() for i := len(dl.ownHashes) - 1; i >= 0; i-- { + if header := dl.ancientHeaders[dl.ownHashes[i]]; header != nil { + return header + } if header := dl.ownHeaders[dl.ownHashes[i]]; header != nil { return header } @@ -161,6 +186,12 @@ func (dl *downloadTester) CurrentBlock() *types.Block { defer dl.lock.RUnlock() for i := len(dl.ownHashes) - 1; i >= 0; i-- { + if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil { + if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil { + return block + } + return block + } if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil { if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil { return block @@ -176,6 +207,9 @@ func (dl *downloadTester) CurrentFastBlock() *types.Block { defer dl.lock.RUnlock() for i := len(dl.ownHashes) - 1; i >= 0; i-- { + if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil { + return block + } if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil { return block } @@ -198,6 +232,9 @@ func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int { dl.lock.RLock() defer dl.lock.RUnlock() + if td := dl.ancientChainTd[hash]; td != nil { + return td + } return dl.ownChainTd[hash] } @@ -254,7 +291,7 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) { } // InsertReceiptChain injects a new batch of receipts into the simulated chain. -func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts) (i int, err error) { +func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts, ancientLimit uint64) (i int, err error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -262,11 +299,25 @@ func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []typ if _, ok := dl.ownHeaders[blocks[i].Hash()]; !ok { return i, errors.New("unknown owner") } - if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok { - return i, errors.New("unknown parent") + if _, ok := dl.ancientBlocks[blocks[i].ParentHash()]; !ok { + if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok { + return i, errors.New("unknown parent") + } + } + if blocks[i].NumberU64() <= ancientLimit { + dl.ancientBlocks[blocks[i].Hash()] = blocks[i] + dl.ancientReceipts[blocks[i].Hash()] = receipts[i] + + // Migrate from active db to ancient db + dl.ancientHeaders[blocks[i].Hash()] = blocks[i].Header() + dl.ancientChainTd[blocks[i].Hash()] = new(big.Int).Add(dl.ancientChainTd[blocks[i].ParentHash()], blocks[i].Difficulty()) + + delete(dl.ownHeaders, blocks[i].Hash()) + delete(dl.ownChainTd, blocks[i].Hash()) + } else { + dl.ownBlocks[blocks[i].Hash()] = blocks[i] + dl.ownReceipts[blocks[i].Hash()] = receipts[i] } - dl.ownBlocks[blocks[i].Hash()] = blocks[i] - dl.ownReceipts[blocks[i].Hash()] = receipts[i] } return len(blocks), nil } @@ -284,6 +335,11 @@ func (dl *downloadTester) Rollback(hashes []common.Hash) { delete(dl.ownHeaders, hashes[i]) delete(dl.ownReceipts, hashes[i]) delete(dl.ownBlocks, hashes[i]) + + delete(dl.ancientChainTd, hashes[i]) + delete(dl.ancientHeaders, hashes[i]) + delete(dl.ancientReceipts, hashes[i]) + delete(dl.ancientBlocks, hashes[i]) } } @@ -411,13 +467,13 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng if tester.downloader.mode == LightSync { blocks, receipts = 1, 1 } - if hs := len(tester.ownHeaders); hs != headers { + if hs := len(tester.ownHeaders) + len(tester.ancientHeaders) - 1; hs != headers { t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers) } - if bs := len(tester.ownBlocks); bs != blocks { + if bs := len(tester.ownBlocks) + len(tester.ancientBlocks) - 1; bs != blocks { t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks) } - if rs := len(tester.ownReceipts); rs != receipts { + if rs := len(tester.ownReceipts) + len(tester.ancientReceipts) - 1; rs != receipts { t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts) } } diff --git a/ethdb/batch.go b/ethdb/batch.go index a9c406354..e261415bf 100644 --- a/ethdb/batch.go +++ b/ethdb/batch.go @@ -23,7 +23,7 @@ const IdealBatchSize = 100 * 1024 // Batch is a write-only database that commits changes to its host database // when Write is called. A batch cannot be used concurrently. type Batch interface { - Writer + KeyValueWriter // ValueSize retrieves the amount of data queued up for writing. ValueSize() int @@ -35,7 +35,7 @@ type Batch interface { Reset() // Replay replays the batch contents. - Replay(w Writer) error + Replay(w KeyValueWriter) error } // Batcher wraps the NewBatch method of a backing data store. diff --git a/ethdb/database.go b/ethdb/database.go index 01483f3d4..e3eff32db 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -19,8 +19,8 @@ package ethdb import "io" -// Reader wraps the Has and Get method of a backing data store. -type Reader interface { +// KeyValueReader wraps the Has and Get method of a backing data store. +type KeyValueReader interface { // Has retrieves if a key is present in the key-value data store. Has(key []byte) (bool, error) @@ -28,8 +28,8 @@ type Reader interface { Get(key []byte) ([]byte, error) } -// Writer wraps the Put method of a backing data store. -type Writer interface { +// KeyValueWriter wraps the Put method of a backing data store. +type KeyValueWriter interface { // Put inserts the given value into the key-value data store. Put(key []byte, value []byte) error @@ -58,8 +58,8 @@ type Compacter interface { // KeyValueStore contains all the methods required to allow handling different // key-value data stores backing the high level database. type KeyValueStore interface { - Reader - Writer + KeyValueReader + KeyValueWriter Batcher Iteratee Stater @@ -67,30 +67,58 @@ type KeyValueStore interface { io.Closer } -// Ancienter wraps the Ancient method for a backing immutable chain data store. -type Ancienter interface { +// AncientReader contains the methods required to read from immutable ancient data. +type AncientReader interface { + // HasAncient returns an indicator whether the specified data exists in the + // ancient store. + HasAncient(kind string, number uint64) (bool, error) + // Ancient retrieves an ancient binary blob from the append-only immutable files. Ancient(kind string, number uint64) ([]byte, error) + + // Ancients returns the ancient store length + Ancients() (uint64, error) } -// AncientReader contains the methods required to access both key-value as well as +// AncientWriter contains the methods required to write to immutable ancient data. +type AncientWriter interface { + // AppendAncient injects all binary blobs belong to block at the end of the + // append-only immutable table files. + AppendAncient(number uint64, hash, header, body, receipt, td []byte) error + + // TruncateAncients discards all but the first n ancient data from the ancient store. + TruncateAncients(n uint64) error + + // Sync flushes all in-memory ancient store data to disk. + Sync() error +} + +// Reader contains the methods required to read data from both key-value as well as // immutable ancient data. -type AncientReader interface { - Reader - Ancienter +type Reader interface { + KeyValueReader + AncientReader +} + +// Writer contains the methods required to write data to both key-value as well as +// immutable ancient data. +type Writer interface { + KeyValueWriter + AncientWriter } // AncientStore contains all the methods required to allow handling different // ancient data stores backing immutable chain data store. type AncientStore interface { - Ancienter + AncientReader + AncientWriter io.Closer } // Database contains all the methods required by the high level database to not // only access the key-value data store but also the chain freezer. type Database interface { - AncientReader + Reader Writer Batcher Iteratee diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index 8eabee50e..f74e94d92 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -425,13 +425,13 @@ func (b *batch) Reset() { } // Replay replays the batch contents. -func (b *batch) Replay(w ethdb.Writer) error { +func (b *batch) Replay(w ethdb.KeyValueWriter) error { return b.b.Replay(&replayer{writer: w}) } // replayer is a small wrapper to implement the correct replay methods. type replayer struct { - writer ethdb.Writer + writer ethdb.KeyValueWriter failure error } diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index cb8b27f3b..caa9b02a1 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -270,7 +270,7 @@ func (b *batch) Reset() { } // Replay replays the batch contents. -func (b *batch) Replay(w ethdb.Writer) error { +func (b *batch) Replay(w ethdb.KeyValueWriter) error { for _, keyvalue := range b.writes { if keyvalue.delete { if err := w.Delete(keyvalue.key); err != nil { diff --git a/les/odr_requests.go b/les/odr_requests.go index 328750d7a..89c609177 100644 --- a/les/odr_requests.go +++ b/les/odr_requests.go @@ -514,7 +514,7 @@ func (r *TxStatusRequest) Validate(db ethdb.Database, msg *Msg) error { // readTraceDB stores the keys of database reads. We use this to check that received node // sets contain only the trie nodes necessary to make proofs pass. type readTraceDB struct { - db ethdb.Reader + db ethdb.KeyValueReader reads map[string]struct{} } diff --git a/light/lightchain.go b/light/lightchain.go index 86836dfb5..f0beec47b 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -165,12 +165,12 @@ func (lc *LightChain) loadLastState() error { // SetHead rewinds the local chain to a new head. Everything above the new // head will be deleted and the new one set. -func (lc *LightChain) SetHead(head uint64) { +func (lc *LightChain) SetHead(head uint64) error { lc.chainmu.Lock() defer lc.chainmu.Unlock() - lc.hc.SetHead(head, nil) - lc.loadLastState() + lc.hc.SetHead(head, nil, nil) + return lc.loadLastState() } // GasLimit returns the gas limit of the current HEAD block. diff --git a/light/nodeset.go b/light/nodeset.go index a8bf4f6c6..366259678 100644 --- a/light/nodeset.go +++ b/light/nodeset.go @@ -115,7 +115,7 @@ func (db *NodeSet) NodeList() NodeList { } // Store writes the contents of the set to the given database -func (db *NodeSet) Store(target ethdb.Writer) { +func (db *NodeSet) Store(target ethdb.KeyValueWriter) { db.lock.RLock() defer db.lock.RUnlock() @@ -124,11 +124,11 @@ func (db *NodeSet) Store(target ethdb.Writer) { } } -// NodeList stores an ordered list of trie nodes. It implements ethdb.Writer. +// NodeList stores an ordered list of trie nodes. It implements ethdb.KeyValueWriter. type NodeList []rlp.RawValue // Store writes the contents of the list to the given database -func (n NodeList) Store(db ethdb.Writer) { +func (n NodeList) Store(db ethdb.KeyValueWriter) { for _, node := range n { db.Put(crypto.Keccak256(node), node) } diff --git a/light/trie.go b/light/trie.go index 27abb1dc2..e512bf6f9 100644 --- a/light/trie.go +++ b/light/trie.go @@ -141,7 +141,7 @@ func (t *odrTrie) GetKey(sha []byte) []byte { return nil } -func (t *odrTrie) Prove(key []byte, fromLevel uint, proofDb ethdb.Writer) error { +func (t *odrTrie) Prove(key []byte, fromLevel uint, proofDb ethdb.KeyValueWriter) error { return errors.New("not implemented, needs client/server interface split") } diff --git a/trie/database.go b/trie/database.go index 49a696bef..d8a0fa9c5 100644 --- a/trie/database.go +++ b/trie/database.go @@ -321,7 +321,7 @@ func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int) *Database { } // DiskDB retrieves the persistent storage backing the trie database. -func (db *Database) DiskDB() ethdb.Reader { +func (db *Database) DiskDB() ethdb.KeyValueReader { return db.diskdb } diff --git a/trie/proof.go b/trie/proof.go index 3c4b8d653..9985e730d 100644 --- a/trie/proof.go +++ b/trie/proof.go @@ -33,7 +33,7 @@ import ( // If the trie does not contain a value for key, the returned proof contains all // nodes of the longest existing prefix of the key (at least the root node), ending // with the node that proves the absence of the key. -func (t *Trie) Prove(key []byte, fromLevel uint, proofDb ethdb.Writer) error { +func (t *Trie) Prove(key []byte, fromLevel uint, proofDb ethdb.KeyValueWriter) error { // Collect all nodes on the path to key. key = keybytesToHex(key) var nodes []node @@ -96,16 +96,14 @@ func (t *Trie) Prove(key []byte, fromLevel uint, proofDb ethdb.Writer) error { // If the trie does not contain a value for key, the returned proof contains all // nodes of the longest existing prefix of the key (at least the root node), ending // with the node that proves the absence of the key. -func (t *SecureTrie) Prove(key []byte, fromLevel uint, proofDb ethdb.Writer) error { +func (t *SecureTrie) Prove(key []byte, fromLevel uint, proofDb ethdb.KeyValueWriter) error { return t.trie.Prove(key, fromLevel, proofDb) } // VerifyProof checks merkle proofs. The given proof must contain the value for // key in a trie with the given root hash. VerifyProof returns an error if the // proof contains invalid trie nodes or the wrong value. -// -// Note, the method assumes that all key-values in proofDb satisfy key = hash(value). -func VerifyProof(rootHash common.Hash, key []byte, proofDb ethdb.Reader) (value []byte, nodes int, err error) { +func VerifyProof(rootHash common.Hash, key []byte, proofDb ethdb.KeyValueReader) (value []byte, nodes int, err error) { key = keybytesToHex(key) wantHash := rootHash for i := 0; ; i++ { diff --git a/trie/sync.go b/trie/sync.go index d9564d783..6f40b45a1 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -72,7 +72,7 @@ func newSyncMemBatch() *syncMemBatch { // unknown trie hashes to retrieve, accepts node data associated with said hashes // and reconstructs the trie step by step until all is done. type Sync struct { - database ethdb.Reader // Persistent database to check for existing entries + database ethdb.KeyValueReader // Persistent database to check for existing entries membatch *syncMemBatch // Memory buffer to avoid frequent database writes requests map[common.Hash]*request // Pending requests pertaining to a key hash queue *prque.Prque // Priority queue with the pending requests @@ -80,7 +80,7 @@ type Sync struct { } // NewSync creates a new trie data download scheduler. -func NewSync(root common.Hash, database ethdb.Reader, callback LeafCallback, bloom *SyncBloom) *Sync { +func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallback, bloom *SyncBloom) *Sync { ts := &Sync{ database: database, membatch: newSyncMemBatch(), @@ -224,7 +224,7 @@ func (s *Sync) Process(results []SyncResult) (bool, int, error) { // Commit flushes the data stored in the internal membatch out to persistent // storage, returning the number of items written and any occurred error. -func (s *Sync) Commit(dbw ethdb.Writer) (int, error) { +func (s *Sync) Commit(dbw ethdb.KeyValueWriter) (int, error) { // Dump the membatch into a database dbw for i, key := range s.membatch.order { if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil {