From f2eb3b1c56a2b745a100760e5fabb78e339cb4cf Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 8 Jul 2019 10:24:16 +0200 Subject: [PATCH] core: lessen mem-spike during 1.8->1.9 conversion (#19610) * core/blockchain: lessen mem-spike during 1.8->1.9 conversion * core/blockchain.go: make levedb->freezer conversion gradually * core/blockchain: write the batch --- core/blockchain.go | 51 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index b4c1716a9..8b7c2e615 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -927,6 +927,12 @@ func (bc *BlockChain) truncateAncient(head uint64) error { return nil } +// numberHash is just a container for a number and a hash, to represent a block +type numberHash struct { + number uint64 + hash common.Hash +} + // InsertReceiptChain attempts to complete an already existing header chain with // transaction and receipt data. func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) { @@ -998,7 +1004,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } } }() - var deleted types.Blocks + var deleted []*numberHash for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed if atomic.LoadInt32(&bc.procInterrupt) == 1 { @@ -1033,12 +1039,40 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // Always keep genesis block in active database. if b.NumberU64() != 0 { - deleted = append(deleted, b) + deleted = append(deleted, &numberHash{b.NumberU64(), b.Hash()}) } if time.Since(logged) > 8*time.Second { log.Info("Migrating ancient blocks", "count", count, "elapsed", common.PrettyDuration(time.Since(start))) logged = time.Now() } + // Don't collect too much in-memory, write it out every 100K blocks + if len(deleted) > 100000 { + + // Sync the ancient store explicitly to ensure all data has been flushed to disk. + if err := bc.db.Sync(); err != nil { + return 0, err + } + // Wipe out canonical block data. + for _, nh := range deleted { + rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number) + rawdb.DeleteCanonicalHash(batch, nh.number) + } + if err := batch.Write(); err != nil { + return 0, err + } + batch.Reset() + // Wipe out side chain too. + for _, nh := range deleted { + for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) { + rawdb.DeleteBlock(batch, hash, nh.number) + } + } + if err := batch.Write(); err != nil { + return 0, err + } + batch.Reset() + deleted = deleted[0:] + } } if count > 0 { log.Info("Migrated ancient blocks", "count", count, "elapsed", common.PrettyDuration(time.Since(start))) @@ -1066,7 +1100,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ previous = nil // disable rollback explicitly // Wipe out canonical block data. - for _, block := range append(deleted, blockChain...) { + for _, nh := range deleted { + rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number) + rawdb.DeleteCanonicalHash(batch, nh.number) + } + for _, block := range blockChain { // Always keep genesis block in active database. if block.NumberU64() != 0 { rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64()) @@ -1079,7 +1117,12 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ batch.Reset() // Wipe out side chain too. - for _, block := range append(deleted, blockChain...) { + for _, nh := range deleted { + for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) { + rawdb.DeleteBlock(batch, hash, nh.number) + } + } + for _, block := range blockChain { // Always keep genesis block in active database. if block.NumberU64() != 0 { for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) {