diff --git a/core/blockchain.go b/core/blockchain.go
index e14291aa5..66bc395c9 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -207,8 +207,7 @@ type BlockChain struct {
processor Processor // Block transaction processor interface
vmConfig vm.Config
- shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
- terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
+ shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
}
// NewBlockChain returns a fully initialised block chain using information
@@ -1085,38 +1084,6 @@ const (
SideStatTy
)
-// truncateAncient rewinds the blockchain to the specified header and deletes all
-// data in the ancient store that exceeds the specified header.
-func (bc *BlockChain) truncateAncient(head uint64) error {
- frozen, err := bc.db.Ancients()
- if err != nil {
- return err
- }
- // Short circuit if there is no data to truncate in ancient store.
- if frozen <= head+1 {
- return nil
- }
- // Truncate all the data in the freezer beyond the specified head
- if err := bc.db.TruncateAncients(head + 1); err != nil {
- return err
- }
- // Clear out any stale content from the caches
- bc.hc.headerCache.Purge()
- bc.hc.tdCache.Purge()
- bc.hc.numberCache.Purge()
-
- // Clear out any stale content from the caches
- bc.bodyCache.Purge()
- bc.bodyRLPCache.Purge()
- bc.receiptsCache.Purge()
- bc.blockCache.Purge()
- bc.txLookupCache.Purge()
- bc.futureBlocks.Purge()
-
- log.Info("Rewind ancient data", "number", head)
- return nil
-}
-
// numberHash is just a container for a number and a hash, to represent a block
type numberHash struct {
number uint64
@@ -1155,12 +1122,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
var (
stats = struct{ processed, ignored int32 }{}
start = time.Now()
- size = 0
+ size = int64(0)
)
+
// updateHead updates the head fast sync block if the inserted blocks are better
// and returns an indicator whether the inserted blocks are canonical.
updateHead := func(head *types.Block) bool {
bc.chainmu.Lock()
+ defer bc.chainmu.Unlock()
// Rewind may have occurred, skip in that case.
if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 {
@@ -1169,68 +1138,63 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentFastBlock.Store(head)
headFastBlockGauge.Update(int64(head.NumberU64()))
- bc.chainmu.Unlock()
return true
}
}
- bc.chainmu.Unlock()
return false
}
+
// writeAncient writes blockchain and corresponding receipt chain into ancient store.
//
// this function only accepts canonical chain data. All side chain will be reverted
// eventually.
writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
- var (
- previous = bc.CurrentFastBlock()
- batch = bc.db.NewBatch()
- )
- // If any error occurs before updating the head or we are inserting a side chain,
- // all the data written this time wll be rolled back.
- defer func() {
- if previous != nil {
- if err := bc.truncateAncient(previous.NumberU64()); err != nil {
- log.Crit("Truncate ancient store failed", "err", err)
- }
- }
- }()
- var deleted []*numberHash
- for i, block := range blockChain {
- // Short circuit insertion if shutting down or processing failed
- if bc.insertStopped() {
- return 0, errInsertionInterrupted
- }
- // Short circuit insertion if it is required(used in testing only)
- if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) {
- return i, errors.New("insertion is terminated for testing purpose")
- }
- // Short circuit if the owner header is unknown
- if !bc.HasHeader(block.Hash(), block.NumberU64()) {
- return i, fmt.Errorf("containing header #%d [%x..] unknown", block.Number(), block.Hash().Bytes()[:4])
- }
- if block.NumberU64() == 1 {
- // Make sure to write the genesis into the freezer
- if frozen, _ := bc.db.Ancients(); frozen == 0 {
- h := rawdb.ReadCanonicalHash(bc.db, 0)
- b := rawdb.ReadBlock(bc.db, h, 0)
- size += rawdb.WriteAncientBlock(bc.db, b, rawdb.ReadReceipts(bc.db, h, 0, bc.chainConfig), rawdb.ReadTd(bc.db, h, 0))
- log.Info("Wrote genesis to ancients")
- }
- }
- // Flush data into ancient database.
- size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64()))
+ first := blockChain[0]
+ last := blockChain[len(blockChain)-1]
- // Write tx indices if any condition is satisfied:
- // * If user requires to reserve all tx indices(txlookuplimit=0)
- // * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
- // * If block number is large enough to be regarded as a recent block
- // It means blocks below the ancientLimit-txlookupLimit won't be indexed.
- //
- // But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
- // an external ancient database, during the setup, blockchain will start
- // a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
- // range. In this case, all tx indices of newly imported blocks should be
- // generated.
+ // Ensure genesis is in ancients.
+ if first.NumberU64() == 1 {
+ if frozen, _ := bc.db.Ancients(); frozen == 0 {
+ b := bc.genesisBlock
+ td := bc.genesisBlock.Difficulty()
+ writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, td)
+ size += writeSize
+ if err != nil {
+ log.Error("Error writing genesis to ancients", "err", err)
+ return 0, err
+ }
+ log.Info("Wrote genesis to ancients")
+ }
+ }
+ // Before writing the blocks to the ancients, we need to ensure that
+ // they correspond to the what the headerchain 'expects'.
+ // We only check the last block/header, since it's a contiguous chain.
+ if !bc.HasHeader(last.Hash(), last.NumberU64()) {
+ return 0, fmt.Errorf("containing header #%d [%x..] unknown", last.Number(), last.Hash().Bytes()[:4])
+ }
+
+ // Write all chain data to ancients.
+ td := bc.GetTd(first.Hash(), first.NumberU64())
+ writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
+ size += writeSize
+ if err != nil {
+ log.Error("Error importing chain data to ancients", "err", err)
+ return 0, err
+ }
+
+ // Write tx indices if any condition is satisfied:
+ // * If user requires to reserve all tx indices(txlookuplimit=0)
+ // * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
+ // * If block number is large enough to be regarded as a recent block
+ // It means blocks below the ancientLimit-txlookupLimit won't be indexed.
+ //
+ // But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
+ // an external ancient database, during the setup, blockchain will start
+ // a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
+ // range. In this case, all tx indices of newly imported blocks should be
+ // generated.
+ var batch = bc.db.NewBatch()
+ for _, block := range blockChain {
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
} else if rawdb.ReadTxIndexTail(bc.db) != nil {
@@ -1238,51 +1202,50 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
stats.processed++
}
+
// Flush all tx-lookup index data.
- size += batch.ValueSize()
+ size += int64(batch.ValueSize())
if err := batch.Write(); err != nil {
+ // The tx index data could not be written.
+ // Roll back the ancient store update.
+ fastBlock := bc.CurrentFastBlock().NumberU64()
+ if err := bc.db.TruncateAncients(fastBlock + 1); err != nil {
+ log.Error("Can't truncate ancient store after failed insert", "err", err)
+ }
return 0, err
}
- batch.Reset()
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil {
return 0, err
}
+
+ // Update the current fast block because all block data is now present in DB.
+ previousFastBlock := bc.CurrentFastBlock().NumberU64()
if !updateHead(blockChain[len(blockChain)-1]) {
- return 0, errors.New("side blocks can't be accepted as the ancient chain data")
- }
- previous = nil // disable rollback explicitly
-
- // Wipe out canonical block data.
- for _, nh := range deleted {
- rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number)
- rawdb.DeleteCanonicalHash(batch, nh.number)
- }
- for _, block := range blockChain {
- // Always keep genesis block in active database.
- if block.NumberU64() != 0 {
- rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
- rawdb.DeleteCanonicalHash(batch, block.NumberU64())
+ // We end up here if the header chain has reorg'ed, and the blocks/receipts
+ // don't match the canonical chain.
+ if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil {
+ log.Error("Can't truncate ancient store after failed insert", "err", err)
}
+ return 0, errSideChainReceipts
}
- if err := batch.Write(); err != nil {
- return 0, err
- }
+
+ // Delete block data from the main database.
batch.Reset()
-
- // Wipe out side chain too.
- for _, nh := range deleted {
- for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) {
- rawdb.DeleteBlock(batch, hash, nh.number)
- }
- }
+ canonHashes := make(map[common.Hash]struct{})
for _, block := range blockChain {
- // Always keep genesis block in active database.
- if block.NumberU64() != 0 {
- for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) {
- rawdb.DeleteBlock(batch, hash, block.NumberU64())
- }
+ canonHashes[block.Hash()] = struct{}{}
+ if block.NumberU64() == 0 {
+ continue
+ }
+ rawdb.DeleteCanonicalHash(batch, block.NumberU64())
+ rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
+ }
+ // Delete side chain hash-to-number mappings.
+ for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) {
+ if _, canon := canonHashes[nh.Hash]; !canon {
+ rawdb.DeleteHeader(batch, nh.Hash, nh.Number)
}
}
if err := batch.Write(); err != nil {
@@ -1290,6 +1253,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
return 0, nil
}
+
// writeLive writes blockchain and corresponding receipt chain into active store.
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
skipPresenceCheck := false
@@ -1327,7 +1291,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if err := batch.Write(); err != nil {
return 0, err
}
- size += batch.ValueSize()
+ size += int64(batch.ValueSize())
batch.Reset()
}
stats.processed++
@@ -1336,7 +1300,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// we can ensure all components of body is completed(body, receipts,
// tx indexes)
if batch.ValueSize() > 0 {
- size += batch.ValueSize()
+ size += int64(batch.ValueSize())
if err := batch.Write(); err != nil {
return 0, err
}
@@ -1344,6 +1308,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
updateHead(blockChain[len(blockChain)-1])
return 0, nil
}
+
// Write downloaded chain data and corresponding receipt chain data
if len(ancientBlocks) > 0 {
if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 4e5df633b..8d94f17aa 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -670,6 +670,7 @@ func TestFastVsFullChains(t *testing.T) {
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
+
// Iterate over all chain data components, and cross reference
for i := 0; i < len(blocks); i++ {
num, hash := blocks[i].NumberU64(), blocks[i].Hash()
@@ -693,10 +694,27 @@ func TestFastVsFullChains(t *testing.T) {
} else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) || types.CalcUncleHash(anblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) {
t.Errorf("block #%d [%x]: uncles mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Uncles(), anblock, arblock.Uncles())
}
- if freceipts, anreceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(ancientDb, hash, *rawdb.ReadHeaderNumber(ancientDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), fast.Config()); types.DeriveSha(freceipts, trie.NewStackTrie(nil)) != types.DeriveSha(areceipts, trie.NewStackTrie(nil)) {
+
+ // Check receipts.
+ freceipts := rawdb.ReadReceipts(fastDb, hash, num, fast.Config())
+ anreceipts := rawdb.ReadReceipts(ancientDb, hash, num, fast.Config())
+ areceipts := rawdb.ReadReceipts(archiveDb, hash, num, fast.Config())
+ if types.DeriveSha(freceipts, trie.NewStackTrie(nil)) != types.DeriveSha(areceipts, trie.NewStackTrie(nil)) {
t.Errorf("block #%d [%x]: receipts mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, freceipts, anreceipts, areceipts)
}
+
+ // Check that hash-to-number mappings are present in all databases.
+ if m := rawdb.ReadHeaderNumber(fastDb, hash); m == nil || *m != num {
+ t.Errorf("block #%d [%x]: wrong hash-to-number mapping in fastdb: %v", num, hash, m)
+ }
+ if m := rawdb.ReadHeaderNumber(ancientDb, hash); m == nil || *m != num {
+ t.Errorf("block #%d [%x]: wrong hash-to-number mapping in ancientdb: %v", num, hash, m)
+ }
+ if m := rawdb.ReadHeaderNumber(archiveDb, hash); m == nil || *m != num {
+ t.Errorf("block #%d [%x]: wrong hash-to-number mapping in archivedb: %v", num, hash, m)
+ }
}
+
// Check that the canonical chains are the same between the databases
for i := 0; i < len(blocks)+1; i++ {
if fhash, ahash := rawdb.ReadCanonicalHash(fastDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); fhash != ahash {
@@ -1639,20 +1657,34 @@ func TestBlockchainRecovery(t *testing.T) {
}
}
-func TestIncompleteAncientReceiptChainInsertion(t *testing.T) {
- // Configure and generate a sample block chain
- var (
- gendb = rawdb.NewMemoryDatabase()
- key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
- address = crypto.PubkeyToAddress(key.PublicKey)
- funds = big.NewInt(1000000000)
- gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}}
- genesis = gspec.MustCommit(gendb)
- )
- height := uint64(1024)
- blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil)
+// This test checks that InsertReceiptChain will roll back correctly when attempting to insert a side chain.
+func TestInsertReceiptChainRollback(t *testing.T) {
+ // Generate forked chain. The returned BlockChain object is used to process the side chain blocks.
+ tmpChain, sideblocks, canonblocks, err := getLongAndShortChains()
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer tmpChain.Stop()
+ // Get the side chain receipts.
+ if _, err := tmpChain.InsertChain(sideblocks); err != nil {
+ t.Fatal("processing side chain failed:", err)
+ }
+ t.Log("sidechain head:", tmpChain.CurrentBlock().Number(), tmpChain.CurrentBlock().Hash())
+ sidechainReceipts := make([]types.Receipts, len(sideblocks))
+ for i, block := range sideblocks {
+ sidechainReceipts[i] = tmpChain.GetReceiptsByHash(block.Hash())
+ }
+ // Get the canon chain receipts.
+ if _, err := tmpChain.InsertChain(canonblocks); err != nil {
+ t.Fatal("processing canon chain failed:", err)
+ }
+ t.Log("canon head:", tmpChain.CurrentBlock().Number(), tmpChain.CurrentBlock().Hash())
+ canonReceipts := make([]types.Receipts, len(canonblocks))
+ for i, block := range canonblocks {
+ canonReceipts[i] = tmpChain.GetReceiptsByHash(block.Hash())
+ }
- // Import the chain as a ancient-first node and ensure all pointers are updated
+ // Set up a BlockChain that uses the ancient store.
frdir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
@@ -1662,38 +1694,43 @@ func TestIncompleteAncientReceiptChainInsertion(t *testing.T) {
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
+ gspec := Genesis{Config: params.AllEthashProtocolChanges}
gspec.MustCommit(ancientDb)
- ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil)
- defer ancient.Stop()
+ ancientChain, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil)
+ defer ancientChain.Stop()
- headers := make([]*types.Header, len(blocks))
- for i, block := range blocks {
- headers[i] = block.Header()
+ // Import the canonical header chain.
+ canonHeaders := make([]*types.Header, len(canonblocks))
+ for i, block := range canonblocks {
+ canonHeaders[i] = block.Header()
}
- if n, err := ancient.InsertHeaderChain(headers, 1); err != nil {
- t.Fatalf("failed to insert header %d: %v", n, err)
+ if _, err = ancientChain.InsertHeaderChain(canonHeaders, 1); err != nil {
+ t.Fatal("can't import canon headers:", err)
}
- // Abort ancient receipt chain insertion deliberately
- ancient.terminateInsert = func(hash common.Hash, number uint64) bool {
- return number == blocks[len(blocks)/2].NumberU64()
+
+ // Try to insert blocks/receipts of the side chain.
+ _, err = ancientChain.InsertReceiptChain(sideblocks, sidechainReceipts, uint64(len(sideblocks)))
+ if err == nil {
+ t.Fatal("expected error from InsertReceiptChain.")
}
- previousFastBlock := ancient.CurrentFastBlock()
- if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil {
- t.Fatalf("failed to insert receipt %d: %v", n, err)
+ if ancientChain.CurrentFastBlock().NumberU64() != 0 {
+ t.Fatalf("failed to rollback ancient data, want %d, have %d", 0, ancientChain.CurrentFastBlock().NumberU64())
}
- if ancient.CurrentFastBlock().NumberU64() != previousFastBlock.NumberU64() {
- t.Fatalf("failed to rollback ancient data, want %d, have %d", previousFastBlock.NumberU64(), ancient.CurrentFastBlock().NumberU64())
+ if frozen, err := ancientChain.db.Ancients(); err != nil || frozen != 1 {
+ t.Fatalf("failed to truncate ancient data, frozen index is %d", frozen)
}
- if frozen, err := ancient.db.Ancients(); err != nil || frozen != 1 {
- t.Fatalf("failed to truncate ancient data")
+
+ // Insert blocks/receipts of the canonical chain.
+ _, err = ancientChain.InsertReceiptChain(canonblocks, canonReceipts, uint64(len(canonblocks)))
+ if err != nil {
+ t.Fatalf("can't import canon chain receipts: %v", err)
}
- ancient.terminateInsert = nil
- if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
- t.Fatalf("failed to insert receipt %d: %v", n, err)
- }
- if ancient.CurrentFastBlock().NumberU64() != blocks[len(blocks)-1].NumberU64() {
+ if ancientChain.CurrentFastBlock().NumberU64() != canonblocks[len(canonblocks)-1].NumberU64() {
t.Fatalf("failed to insert ancient recept chain after rollback")
}
+ if frozen, _ := ancientChain.db.Ancients(); frozen != uint64(len(canonblocks))+1 {
+ t.Fatalf("wrong ancients count %d", frozen)
+ }
}
// Tests that importing a very large side fork, which is larger than the canon chain,
@@ -1958,9 +1995,8 @@ func testInsertKnownChainData(t *testing.T, typ string) {
asserter(t, blocks2[len(blocks2)-1])
}
-// getLongAndShortChains returns two chains,
-// A is longer, B is heavier
-func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error) {
+// getLongAndShortChains returns two chains: A is longer, B is heavier.
+func getLongAndShortChains() (bc *BlockChain, longChain []*types.Block, heavyChain []*types.Block, err error) {
// Generate a canonical chain to act as the main dataset
engine := ethash.NewFaker()
db := rawdb.NewMemoryDatabase()
@@ -1968,7 +2004,7 @@ func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error
// Generate and import the canonical chain,
// Offset the time, to keep the difficulty low
- longChain, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 80, func(i int, b *BlockGen) {
+ longChain, _ = GenerateChain(params.TestChainConfig, genesis, engine, db, 80, func(i int, b *BlockGen) {
b.SetCoinbase(common.Address{1})
})
diskdb := rawdb.NewMemoryDatabase()
@@ -1982,10 +2018,13 @@ func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error
// Generate fork chain, make it shorter than canon, with common ancestor pretty early
parentIndex := 3
parent := longChain[parentIndex]
- heavyChain, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 75, func(i int, b *BlockGen) {
+ heavyChainExt, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 75, func(i int, b *BlockGen) {
b.SetCoinbase(common.Address{2})
b.OffsetTime(-9)
})
+ heavyChain = append(heavyChain, longChain[:parentIndex+1]...)
+ heavyChain = append(heavyChain, heavyChainExt...)
+
// Verify that the test is sane
var (
longerTd = new(big.Int)
diff --git a/core/error.go b/core/error.go
index 33cf874d1..594f3a26e 100644
--- a/core/error.go
+++ b/core/error.go
@@ -31,6 +31,8 @@ var (
// ErrNoGenesis is returned when there is no Genesis Block.
ErrNoGenesis = errors.New("genesis not found in chain")
+
+ errSideChainReceipts = errors.New("side blocks can't be accepted as ancient chain data")
)
// List of evm-call-message pre-checking errors. All state transition messages will
diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go
index 76132bf37..58226fb04 100644
--- a/core/rawdb/accessors_chain.go
+++ b/core/rawdb/accessors_chain.go
@@ -19,6 +19,7 @@ package rawdb
import (
"bytes"
"encoding/binary"
+ "fmt"
"math/big"
"sort"
@@ -81,6 +82,37 @@ func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash {
return hashes
}
+type NumberHash struct {
+ Number uint64
+ Hash common.Hash
+}
+
+// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights,
+// both canonical and reorged forks included.
+// This method considers both limits to be _inclusive_.
+func ReadAllHashesInRange(db ethdb.Iteratee, first, last uint64) []*NumberHash {
+ var (
+ start = encodeBlockNumber(first)
+ keyLength = len(headerPrefix) + 8 + 32
+ hashes = make([]*NumberHash, 0, 1+last-first)
+ it = db.NewIterator(headerPrefix, start)
+ )
+ defer it.Release()
+ for it.Next() {
+ key := it.Key()
+ if len(key) != keyLength {
+ continue
+ }
+ num := binary.BigEndian.Uint64(key[len(headerPrefix) : len(headerPrefix)+8])
+ if num > last {
+ break
+ }
+ hash := common.BytesToHash(key[len(key)-32:])
+ hashes = append(hashes, &NumberHash{num, hash})
+ }
+ return hashes
+}
+
// ReadAllCanonicalHashes retrieves all canonical number and hash mappings at the
// certain chain range. If the accumulated entries reaches the given threshold,
// abort the iteration and return the semi-finish result.
@@ -656,34 +688,48 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {
}
// WriteAncientBlock writes entire block data into ancient store and returns the total written size.
-func WriteAncientBlock(db ethdb.AncientWriter, block *types.Block, receipts types.Receipts, td *big.Int) int {
- // Encode all block components to RLP format.
- headerBlob, err := rlp.EncodeToBytes(block.Header())
- if err != nil {
- log.Crit("Failed to RLP encode block header", "err", err)
+func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
+ var (
+ tdSum = new(big.Int).Set(td)
+ stReceipts []*types.ReceiptForStorage
+ )
+ return db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
+ for i, block := range blocks {
+ // Convert receipts to storage format and sum up total difficulty.
+ stReceipts = stReceipts[:0]
+ for _, receipt := range receipts[i] {
+ stReceipts = append(stReceipts, (*types.ReceiptForStorage)(receipt))
+ }
+ header := block.Header()
+ if i > 0 {
+ tdSum.Add(tdSum, header.Difficulty)
+ }
+ if err := writeAncientBlock(op, block, header, stReceipts, tdSum); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+}
+
+func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int) error {
+ num := block.NumberU64()
+ if err := op.AppendRaw(freezerHashTable, num, block.Hash().Bytes()); err != nil {
+ return fmt.Errorf("can't add block %d hash: %v", num, err)
}
- bodyBlob, err := rlp.EncodeToBytes(block.Body())
- if err != nil {
- log.Crit("Failed to RLP encode body", "err", err)
+ if err := op.Append(freezerHeaderTable, num, header); err != nil {
+ return fmt.Errorf("can't append block header %d: %v", num, err)
}
- storageReceipts := make([]*types.ReceiptForStorage, len(receipts))
- for i, receipt := range receipts {
- storageReceipts[i] = (*types.ReceiptForStorage)(receipt)
+ if err := op.Append(freezerBodiesTable, num, block.Body()); err != nil {
+ return fmt.Errorf("can't append block body %d: %v", num, err)
}
- receiptBlob, err := rlp.EncodeToBytes(storageReceipts)
- if err != nil {
- log.Crit("Failed to RLP encode block receipts", "err", err)
+ if err := op.Append(freezerReceiptTable, num, receipts); err != nil {
+ return fmt.Errorf("can't append block %d receipts: %v", num, err)
}
- tdBlob, err := rlp.EncodeToBytes(td)
- if err != nil {
- log.Crit("Failed to RLP encode block total difficulty", "err", err)
+ if err := op.Append(freezerDifficultyTable, num, td); err != nil {
+ return fmt.Errorf("can't append block %d total difficulty: %v", num, err)
}
- // Write all blob to flatten files.
- err = db.AppendAncient(block.NumberU64(), block.Hash().Bytes(), headerBlob, bodyBlob, receiptBlob, tdBlob)
- if err != nil {
- log.Crit("Failed to write block data to ancient store", "err", err)
- }
- return len(headerBlob) + len(bodyBlob) + len(receiptBlob) + len(tdBlob) + common.HashLength
+ return nil
}
// DeleteBlock removes all block data associated with a hash.
diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go
index f20e8b1ff..58d7645e5 100644
--- a/core/rawdb/accessors_chain_test.go
+++ b/core/rawdb/accessors_chain_test.go
@@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"golang.org/x/crypto/sha3"
@@ -438,7 +439,7 @@ func TestAncientStorage(t *testing.T) {
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
- defer os.Remove(frdir)
+ defer os.RemoveAll(frdir)
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false)
if err != nil {
@@ -467,8 +468,10 @@ func TestAncientStorage(t *testing.T) {
if blob := ReadTdRLP(db, hash, number); len(blob) > 0 {
t.Fatalf("non existent td returned")
}
+
// Write and verify the header in the database
- WriteAncientBlock(db, block, nil, big.NewInt(100))
+ WriteAncientBlocks(db, []*types.Block{block}, []types.Receipts{nil}, big.NewInt(100))
+
if blob := ReadHeaderRLP(db, hash, number); len(blob) == 0 {
t.Fatalf("no header returned")
}
@@ -481,6 +484,7 @@ func TestAncientStorage(t *testing.T) {
if blob := ReadTdRLP(db, hash, number); len(blob) == 0 {
t.Fatalf("no td returned")
}
+
// Use a fake hash for data retrieval, nothing should be returned.
fakeHash := common.BytesToHash([]byte{0x01, 0x02, 0x03})
if blob := ReadHeaderRLP(db, fakeHash, number); len(blob) != 0 {
@@ -528,3 +532,141 @@ func TestCanonicalHashIteration(t *testing.T) {
}
}
}
+
+func TestHashesInRange(t *testing.T) {
+ mkHeader := func(number, seq int) *types.Header {
+ h := types.Header{
+ Difficulty: new(big.Int),
+ Number: big.NewInt(int64(number)),
+ GasLimit: uint64(seq),
+ }
+ return &h
+ }
+ db := NewMemoryDatabase()
+ // For each number, write N versions of that particular number
+ total := 0
+ for i := 0; i < 15; i++ {
+ for ii := 0; ii < i; ii++ {
+ WriteHeader(db, mkHeader(i, ii))
+ total++
+ }
+ }
+ if have, want := len(ReadAllHashesInRange(db, 10, 10)), 10; have != want {
+ t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
+ }
+ if have, want := len(ReadAllHashesInRange(db, 10, 9)), 0; have != want {
+ t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
+ }
+ if have, want := len(ReadAllHashesInRange(db, 0, 100)), total; have != want {
+ t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
+ }
+ if have, want := len(ReadAllHashesInRange(db, 9, 10)), 9+10; have != want {
+ t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
+ }
+ if have, want := len(ReadAllHashes(db, 10)), 10; have != want {
+ t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
+ }
+ if have, want := len(ReadAllHashes(db, 16)), 0; have != want {
+ t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
+ }
+ if have, want := len(ReadAllHashes(db, 1)), 1; have != want {
+ t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
+ }
+}
+
+// This measures the write speed of the WriteAncientBlocks operation.
+func BenchmarkWriteAncientBlocks(b *testing.B) {
+ // Open freezer database.
+ frdir, err := ioutil.TempDir("", "")
+ if err != nil {
+ b.Fatalf("failed to create temp freezer dir: %v", err)
+ }
+ defer os.RemoveAll(frdir)
+ db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false)
+ if err != nil {
+ b.Fatalf("failed to create database with ancient backend")
+ }
+
+ // Create the data to insert. The blocks must have consecutive numbers, so we create
+ // all of them ahead of time. However, there is no need to create receipts
+ // individually for each block, just make one batch here and reuse it for all writes.
+ const batchSize = 128
+ const blockTxs = 20
+ allBlocks := makeTestBlocks(b.N, blockTxs)
+ batchReceipts := makeTestReceipts(batchSize, blockTxs)
+ b.ResetTimer()
+
+ // The benchmark loop writes batches of blocks, but note that the total block count is
+ // b.N. This means the resulting ns/op measurement is the time it takes to write a
+ // single block and its associated data.
+ var td = big.NewInt(55)
+ var totalSize int64
+ for i := 0; i < b.N; i += batchSize {
+ length := batchSize
+ if i+batchSize > b.N {
+ length = b.N - i
+ }
+
+ blocks := allBlocks[i : i+length]
+ receipts := batchReceipts[:length]
+ writeSize, err := WriteAncientBlocks(db, blocks, receipts, td)
+ if err != nil {
+ b.Fatal(err)
+ }
+ totalSize += writeSize
+ }
+
+ // Enable MB/s reporting.
+ b.SetBytes(totalSize / int64(b.N))
+}
+
+// makeTestBlocks creates fake blocks for the ancient write benchmark.
+func makeTestBlocks(nblock int, txsPerBlock int) []*types.Block {
+ key, _ := crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ signer := types.LatestSignerForChainID(big.NewInt(8))
+
+ // Create transactions.
+ txs := make([]*types.Transaction, txsPerBlock)
+ for i := 0; i < len(txs); i++ {
+ var err error
+ to := common.Address{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}
+ txs[i], err = types.SignNewTx(key, signer, &types.LegacyTx{
+ Nonce: 2,
+ GasPrice: big.NewInt(30000),
+ Gas: 0x45454545,
+ To: &to,
+ })
+ if err != nil {
+ panic(err)
+ }
+ }
+
+ // Create the blocks.
+ blocks := make([]*types.Block, nblock)
+ for i := 0; i < nblock; i++ {
+ header := &types.Header{
+ Number: big.NewInt(int64(i)),
+ Extra: []byte("test block"),
+ }
+ blocks[i] = types.NewBlockWithHeader(header).WithBody(txs, nil)
+ blocks[i].Hash() // pre-cache the block hash
+ }
+ return blocks
+}
+
+// makeTestReceipts creates fake receipts for the ancient write benchmark.
+func makeTestReceipts(n int, nPerBlock int) []types.Receipts {
+ receipts := make([]*types.Receipt, nPerBlock)
+ for i := 0; i < len(receipts); i++ {
+ receipts[i] = &types.Receipt{
+ Status: types.ReceiptStatusSuccessful,
+ CumulativeGasUsed: 0x888888888,
+ Logs: make([]*types.Log, 5),
+ }
+ }
+ allReceipts := make([]types.Receipts, n)
+ for i := 0; i < n; i++ {
+ allReceipts[i] = receipts
+ }
+ return allReceipts
+}
diff --git a/core/rawdb/database.go b/core/rawdb/database.go
index 90619169a..0e116ef99 100644
--- a/core/rawdb/database.go
+++ b/core/rawdb/database.go
@@ -104,9 +104,9 @@ func (db *nofreezedb) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
}
-// AppendAncient returns an error as we don't have a backing chain freezer.
-func (db *nofreezedb) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error {
- return errNotSupported
+// ModifyAncients is not supported.
+func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, error) {
+ return 0, errNotSupported
}
// TruncateAncients returns an error as we don't have a backing chain freezer.
@@ -122,9 +122,7 @@ func (db *nofreezedb) Sync() error {
// NewDatabase creates a high level database on top of a given key-value data
// store without a freezer moving immutable chain segments into cold storage.
func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
- return &nofreezedb{
- KeyValueStore: db,
- }
+ return &nofreezedb{KeyValueStore: db}
}
// NewDatabaseWithFreezer creates a high level database on top of a given key-
@@ -132,7 +130,7 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
// storage.
func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string, readonly bool) (ethdb.Database, error) {
// Create the idle freezer instance
- frdb, err := newFreezer(freezer, namespace, readonly)
+ frdb, err := newFreezer(freezer, namespace, readonly, freezerTableSize, FreezerNoSnappy)
if err != nil {
return nil, err
}
diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go
index 4262a615a..4cecbc50f 100644
--- a/core/rawdb/freezer.go
+++ b/core/rawdb/freezer.go
@@ -61,6 +61,9 @@ const (
// freezerBatchLimit is the maximum number of blocks to freeze in one batch
// before doing an fsync and deleting it from the key-value store.
freezerBatchLimit = 30000
+
+ // freezerTableSize defines the maximum size of freezer data files.
+ freezerTableSize = 2 * 1000 * 1000 * 1000
)
// freezer is an memory mapped append-only database to store immutable chain data
@@ -77,6 +80,10 @@ type freezer struct {
frozen uint64 // Number of blocks already frozen
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
+ // This lock synchronizes writers and the truncate operation.
+ writeLock sync.Mutex
+ writeBatch *freezerBatch
+
readonly bool
tables map[string]*freezerTable // Data tables for storing everything
instanceLock fileutil.Releaser // File-system lock to prevent double opens
@@ -90,7 +97,10 @@ type freezer struct {
// newFreezer creates a chain freezer that moves ancient chain data into
// append-only flat file containers.
-func newFreezer(datadir string, namespace string, readonly bool) (*freezer, error) {
+//
+// The 'tables' argument defines the data tables. If the value of a map
+// entry is true, snappy compression is disabled for the table.
+func newFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*freezer, error) {
// Create the initial freezer object
var (
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
@@ -119,8 +129,10 @@ func newFreezer(datadir string, namespace string, readonly bool) (*freezer, erro
trigger: make(chan chan struct{}),
quit: make(chan struct{}),
}
- for name, disableSnappy := range FreezerNoSnappy {
- table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, disableSnappy)
+
+ // Create the tables.
+ for name, disableSnappy := range tables {
+ table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, maxTableSize, disableSnappy)
if err != nil {
for _, table := range freezer.tables {
table.Close()
@@ -130,6 +142,8 @@ func newFreezer(datadir string, namespace string, readonly bool) (*freezer, erro
}
freezer.tables[name] = table
}
+
+ // Truncate all tables to common length.
if err := freezer.repair(); err != nil {
for _, table := range freezer.tables {
table.Close()
@@ -137,12 +151,19 @@ func newFreezer(datadir string, namespace string, readonly bool) (*freezer, erro
lock.Release()
return nil, err
}
+
+ // Create the write batch.
+ freezer.writeBatch = newFreezerBatch(freezer)
+
log.Info("Opened ancient database", "database", datadir, "readonly", readonly)
return freezer, nil
}
// Close terminates the chain freezer, unmapping all the data files.
func (f *freezer) Close() error {
+ f.writeLock.Lock()
+ defer f.writeLock.Unlock()
+
var errs []error
f.closeOnce.Do(func() {
close(f.quit)
@@ -199,60 +220,49 @@ func (f *freezer) Ancients() (uint64, error) {
// AncientSize returns the ancient size of the specified category.
func (f *freezer) AncientSize(kind string) (uint64, error) {
+ // This needs the write lock to avoid data races on table fields.
+ // Speed doesn't matter here, AncientSize is for debugging.
+ f.writeLock.Lock()
+ defer f.writeLock.Unlock()
+
if table := f.tables[kind]; table != nil {
return table.size()
}
return 0, errUnknownTable
}
-// AppendAncient injects all binary blobs belong to block at the end of the
-// append-only immutable table files.
-//
-// Notably, this function is lock free but kind of thread-safe. All out-of-order
-// injection will be rejected. But if two injections with same number happen at
-// the same time, we can get into the trouble.
-func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td []byte) (err error) {
+// ModifyAncients runs the given write operation.
+func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) {
if f.readonly {
- return errReadOnly
+ return 0, errReadOnly
}
- // Ensure the binary blobs we are appending is continuous with freezer.
- if atomic.LoadUint64(&f.frozen) != number {
- return errOutOrderInsertion
- }
- // Rollback all inserted data if any insertion below failed to ensure
- // the tables won't out of sync.
+ f.writeLock.Lock()
+ defer f.writeLock.Unlock()
+
+ // Roll back all tables to the starting position in case of error.
+ prevItem := f.frozen
defer func() {
if err != nil {
- rerr := f.repair()
- if rerr != nil {
- log.Crit("Failed to repair freezer", "err", rerr)
+ // The write operation has failed. Go back to the previous item position.
+ for name, table := range f.tables {
+ err := table.truncate(prevItem)
+ if err != nil {
+ log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err)
+ }
}
- log.Info("Append ancient failed", "number", number, "err", err)
}
}()
- // Inject all the components into the relevant data tables
- if err := f.tables[freezerHashTable].Append(f.frozen, hash[:]); err != nil {
- log.Error("Failed to append ancient hash", "number", f.frozen, "hash", hash, "err", err)
- return err
+
+ f.writeBatch.reset()
+ if err := fn(f.writeBatch); err != nil {
+ return 0, err
}
- if err := f.tables[freezerHeaderTable].Append(f.frozen, header); err != nil {
- log.Error("Failed to append ancient header", "number", f.frozen, "hash", hash, "err", err)
- return err
+ item, writeSize, err := f.writeBatch.commit()
+ if err != nil {
+ return 0, err
}
- if err := f.tables[freezerBodiesTable].Append(f.frozen, body); err != nil {
- log.Error("Failed to append ancient body", "number", f.frozen, "hash", hash, "err", err)
- return err
- }
- if err := f.tables[freezerReceiptTable].Append(f.frozen, receipts); err != nil {
- log.Error("Failed to append ancient receipts", "number", f.frozen, "hash", hash, "err", err)
- return err
- }
- if err := f.tables[freezerDifficultyTable].Append(f.frozen, td); err != nil {
- log.Error("Failed to append ancient difficulty", "number", f.frozen, "hash", hash, "err", err)
- return err
- }
- atomic.AddUint64(&f.frozen, 1) // Only modify atomically
- return nil
+ atomic.StoreUint64(&f.frozen, item)
+ return writeSize, nil
}
// TruncateAncients discards any recent data above the provided threshold number.
@@ -260,6 +270,9 @@ func (f *freezer) TruncateAncients(items uint64) error {
if f.readonly {
return errReadOnly
}
+ f.writeLock.Lock()
+ defer f.writeLock.Unlock()
+
if atomic.LoadUint64(&f.frozen) <= items {
return nil
}
@@ -286,6 +299,24 @@ func (f *freezer) Sync() error {
return nil
}
+// repair truncates all data tables to the same length.
+func (f *freezer) repair() error {
+ min := uint64(math.MaxUint64)
+ for _, table := range f.tables {
+ items := atomic.LoadUint64(&table.items)
+ if min > items {
+ min = items
+ }
+ }
+ for _, table := range f.tables {
+ if err := table.truncate(min); err != nil {
+ return err
+ }
+ }
+ atomic.StoreUint64(&f.frozen, min)
+ return nil
+}
+
// freeze is a background thread that periodically checks the blockchain for any
// import progress and moves ancient data from the fast database into the freezer.
//
@@ -352,54 +383,28 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
backoff = true
continue
}
+
// Seems we have data ready to be frozen, process in usable batches
- limit := *number - threshold
- if limit-f.frozen > freezerBatchLimit {
- limit = f.frozen + freezerBatchLimit
- }
var (
start = time.Now()
- first = f.frozen
- ancients = make([]common.Hash, 0, limit-f.frozen)
+ first, _ = f.Ancients()
+ limit = *number - threshold
)
- for f.frozen <= limit {
- // Retrieves all the components of the canonical block
- hash := ReadCanonicalHash(nfdb, f.frozen)
- if hash == (common.Hash{}) {
- log.Error("Canonical hash missing, can't freeze", "number", f.frozen)
- break
- }
- header := ReadHeaderRLP(nfdb, hash, f.frozen)
- if len(header) == 0 {
- log.Error("Block header missing, can't freeze", "number", f.frozen, "hash", hash)
- break
- }
- body := ReadBodyRLP(nfdb, hash, f.frozen)
- if len(body) == 0 {
- log.Error("Block body missing, can't freeze", "number", f.frozen, "hash", hash)
- break
- }
- receipts := ReadReceiptsRLP(nfdb, hash, f.frozen)
- if len(receipts) == 0 {
- log.Error("Block receipts missing, can't freeze", "number", f.frozen, "hash", hash)
- break
- }
- td := ReadTdRLP(nfdb, hash, f.frozen)
- if len(td) == 0 {
- log.Error("Total difficulty missing, can't freeze", "number", f.frozen, "hash", hash)
- break
- }
- log.Trace("Deep froze ancient block", "number", f.frozen, "hash", hash)
- // Inject all the components into the relevant data tables
- if err := f.AppendAncient(f.frozen, hash[:], header, body, receipts, td); err != nil {
- break
- }
- ancients = append(ancients, hash)
+ if limit-first > freezerBatchLimit {
+ limit = first + freezerBatchLimit
}
+ ancients, err := f.freezeRange(nfdb, first, limit)
+ if err != nil {
+ log.Error("Error in block freeze operation", "err", err)
+ backoff = true
+ continue
+ }
+
// Batch of blocks have been frozen, flush them before wiping from leveldb
if err := f.Sync(); err != nil {
log.Crit("Failed to flush frozen tables", "err", err)
}
+
// Wipe out all data from the active database
batch := db.NewBatch()
for i := 0; i < len(ancients); i++ {
@@ -464,6 +469,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
log.Crit("Failed to delete dangling side blocks", "err", err)
}
}
+
// Log something friendly for the user
context := []interface{}{
"blocks", f.frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", f.frozen - 1,
@@ -480,20 +486,54 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
}
}
-// repair truncates all data tables to the same length.
-func (f *freezer) repair() error {
- min := uint64(math.MaxUint64)
- for _, table := range f.tables {
- items := atomic.LoadUint64(&table.items)
- if min > items {
- min = items
+func (f *freezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) {
+ hashes = make([]common.Hash, 0, limit-number)
+
+ _, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
+ for ; number <= limit; number++ {
+ // Retrieve all the components of the canonical block.
+ hash := ReadCanonicalHash(nfdb, number)
+ if hash == (common.Hash{}) {
+ return fmt.Errorf("canonical hash missing, can't freeze block %d", number)
+ }
+ header := ReadHeaderRLP(nfdb, hash, number)
+ if len(header) == 0 {
+ return fmt.Errorf("block header missing, can't freeze block %d", number)
+ }
+ body := ReadBodyRLP(nfdb, hash, number)
+ if len(body) == 0 {
+ return fmt.Errorf("block body missing, can't freeze block %d", number)
+ }
+ receipts := ReadReceiptsRLP(nfdb, hash, number)
+ if len(receipts) == 0 {
+ return fmt.Errorf("block receipts missing, can't freeze block %d", number)
+ }
+ td := ReadTdRLP(nfdb, hash, number)
+ if len(td) == 0 {
+ return fmt.Errorf("total difficulty missing, can't freeze block %d", number)
+ }
+
+ // Write to the batch.
+ if err := op.AppendRaw(freezerHashTable, number, hash[:]); err != nil {
+ return fmt.Errorf("can't write hash to freezer: %v", err)
+ }
+ if err := op.AppendRaw(freezerHeaderTable, number, header); err != nil {
+ return fmt.Errorf("can't write header to freezer: %v", err)
+ }
+ if err := op.AppendRaw(freezerBodiesTable, number, body); err != nil {
+ return fmt.Errorf("can't write body to freezer: %v", err)
+ }
+ if err := op.AppendRaw(freezerReceiptTable, number, receipts); err != nil {
+ return fmt.Errorf("can't write receipts to freezer: %v", err)
+ }
+ if err := op.AppendRaw(freezerDifficultyTable, number, td); err != nil {
+ return fmt.Errorf("can't write td to freezer: %v", err)
+ }
+
+ hashes = append(hashes, hash)
}
- }
- for _, table := range f.tables {
- if err := table.truncate(min); err != nil {
- return err
- }
- }
- atomic.StoreUint64(&f.frozen, min)
- return nil
+ return nil
+ })
+
+ return hashes, err
}
diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go
new file mode 100644
index 000000000..8297c0ac1
--- /dev/null
+++ b/core/rawdb/freezer_batch.go
@@ -0,0 +1,248 @@
+// Copyright 2021 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package rawdb
+
+import (
+ "fmt"
+ "sync/atomic"
+
+ "github.com/ethereum/go-ethereum/common/math"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/golang/snappy"
+)
+
+// This is the maximum amount of data that will be buffered in memory
+// for a single freezer table batch.
+const freezerBatchBufferLimit = 2 * 1024 * 1024
+
+// freezerBatch is a write operation of multiple items on a freezer.
+type freezerBatch struct {
+ tables map[string]*freezerTableBatch
+}
+
+func newFreezerBatch(f *freezer) *freezerBatch {
+ batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))}
+ for kind, table := range f.tables {
+ batch.tables[kind] = table.newBatch()
+ }
+ return batch
+}
+
+// Append adds an RLP-encoded item of the given kind.
+func (batch *freezerBatch) Append(kind string, num uint64, item interface{}) error {
+ return batch.tables[kind].Append(num, item)
+}
+
+// AppendRaw adds an item of the given kind.
+func (batch *freezerBatch) AppendRaw(kind string, num uint64, item []byte) error {
+ return batch.tables[kind].AppendRaw(num, item)
+}
+
+// reset initializes the batch.
+func (batch *freezerBatch) reset() {
+ for _, tb := range batch.tables {
+ tb.reset()
+ }
+}
+
+// commit is called at the end of a write operation and
+// writes all remaining data to tables.
+func (batch *freezerBatch) commit() (item uint64, writeSize int64, err error) {
+ // Check that count agrees on all batches.
+ item = uint64(math.MaxUint64)
+ for name, tb := range batch.tables {
+ if item < math.MaxUint64 && tb.curItem != item {
+ return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, tb.curItem, item)
+ }
+ item = tb.curItem
+ }
+
+ // Commit all table batches.
+ for _, tb := range batch.tables {
+ if err := tb.commit(); err != nil {
+ return 0, 0, err
+ }
+ writeSize += tb.totalBytes
+ }
+ return item, writeSize, nil
+}
+
+// freezerTableBatch is a batch for a freezer table.
+type freezerTableBatch struct {
+ t *freezerTable
+
+ sb *snappyBuffer
+ encBuffer writeBuffer
+ dataBuffer []byte
+ indexBuffer []byte
+ curItem uint64 // expected index of next append
+ totalBytes int64 // counts written bytes since reset
+}
+
+// newBatch creates a new batch for the freezer table.
+func (t *freezerTable) newBatch() *freezerTableBatch {
+ batch := &freezerTableBatch{t: t}
+ if !t.noCompression {
+ batch.sb = new(snappyBuffer)
+ }
+ batch.reset()
+ return batch
+}
+
+// reset clears the batch for reuse.
+func (batch *freezerTableBatch) reset() {
+ batch.dataBuffer = batch.dataBuffer[:0]
+ batch.indexBuffer = batch.indexBuffer[:0]
+ batch.curItem = atomic.LoadUint64(&batch.t.items)
+ batch.totalBytes = 0
+}
+
+// Append rlp-encodes and adds data at the end of the freezer table. The item number is a
+// precautionary parameter to ensure data correctness, but the table will reject already
+// existing data.
+func (batch *freezerTableBatch) Append(item uint64, data interface{}) error {
+ if item != batch.curItem {
+ return errOutOrderInsertion
+ }
+
+ // Encode the item.
+ batch.encBuffer.Reset()
+ if err := rlp.Encode(&batch.encBuffer, data); err != nil {
+ return err
+ }
+ encItem := batch.encBuffer.data
+ if batch.sb != nil {
+ encItem = batch.sb.compress(encItem)
+ }
+ return batch.appendItem(encItem)
+}
+
+// AppendRaw injects a binary blob at the end of the freezer table. The item number is a
+// precautionary parameter to ensure data correctness, but the table will reject already
+// existing data.
+func (batch *freezerTableBatch) AppendRaw(item uint64, blob []byte) error {
+ if item != batch.curItem {
+ return errOutOrderInsertion
+ }
+
+ encItem := blob
+ if batch.sb != nil {
+ encItem = batch.sb.compress(blob)
+ }
+ return batch.appendItem(encItem)
+}
+
+func (batch *freezerTableBatch) appendItem(data []byte) error {
+ // Check if item fits into current data file.
+ itemSize := int64(len(data))
+ itemOffset := batch.t.headBytes + int64(len(batch.dataBuffer))
+ if itemOffset+itemSize > int64(batch.t.maxFileSize) {
+ // It doesn't fit, go to next file first.
+ if err := batch.commit(); err != nil {
+ return err
+ }
+ if err := batch.t.advanceHead(); err != nil {
+ return err
+ }
+ itemOffset = 0
+ }
+
+ // Put data to buffer.
+ batch.dataBuffer = append(batch.dataBuffer, data...)
+ batch.totalBytes += itemSize
+
+ // Put index entry to buffer.
+ entry := indexEntry{filenum: batch.t.headId, offset: uint32(itemOffset + itemSize)}
+ batch.indexBuffer = entry.append(batch.indexBuffer)
+ batch.curItem++
+
+ return batch.maybeCommit()
+}
+
+// maybeCommit writes the buffered data if the buffer is full enough.
+func (batch *freezerTableBatch) maybeCommit() error {
+ if len(batch.dataBuffer) > freezerBatchBufferLimit {
+ return batch.commit()
+ }
+ return nil
+}
+
+// commit writes the batched items to the backing freezerTable.
+func (batch *freezerTableBatch) commit() error {
+ // Write data.
+ _, err := batch.t.head.Write(batch.dataBuffer)
+ if err != nil {
+ return err
+ }
+ dataSize := int64(len(batch.dataBuffer))
+ batch.dataBuffer = batch.dataBuffer[:0]
+
+ // Write index.
+ _, err = batch.t.index.Write(batch.indexBuffer)
+ if err != nil {
+ return err
+ }
+ indexSize := int64(len(batch.indexBuffer))
+ batch.indexBuffer = batch.indexBuffer[:0]
+
+ // Update headBytes of table.
+ batch.t.headBytes += dataSize
+ atomic.StoreUint64(&batch.t.items, batch.curItem)
+
+ // Update metrics.
+ batch.t.sizeGauge.Inc(dataSize + indexSize)
+ batch.t.writeMeter.Mark(dataSize + indexSize)
+ return nil
+}
+
+// snappyBuffer writes snappy in block format, and can be reused. It is
+// reset when WriteTo is called.
+type snappyBuffer struct {
+ dst []byte
+}
+
+// compress snappy-compresses the data.
+func (s *snappyBuffer) compress(data []byte) []byte {
+ // The snappy library does not care what the capacity of the buffer is,
+ // but only checks the length. If the length is too small, it will
+ // allocate a brand new buffer.
+ // To avoid that, we check the required size here, and grow the size of the
+ // buffer to utilize the full capacity.
+ if n := snappy.MaxEncodedLen(len(data)); len(s.dst) < n {
+ if cap(s.dst) < n {
+ s.dst = make([]byte, n)
+ }
+ s.dst = s.dst[:n]
+ }
+
+ s.dst = snappy.Encode(s.dst, data)
+ return s.dst
+}
+
+// writeBuffer implements io.Writer for a byte slice.
+type writeBuffer struct {
+ data []byte
+}
+
+func (wb *writeBuffer) Write(data []byte) (int, error) {
+ wb.data = append(wb.data, data...)
+ return len(data), nil
+}
+
+func (wb *writeBuffer) Reset() {
+ wb.data = wb.data[:0]
+}
diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go
index 9d052f7cd..22405cf9b 100644
--- a/core/rawdb/freezer_table.go
+++ b/core/rawdb/freezer_table.go
@@ -17,6 +17,7 @@
package rawdb
import (
+ "bytes"
"encoding/binary"
"errors"
"fmt"
@@ -55,19 +56,20 @@ type indexEntry struct {
const indexEntrySize = 6
-// unmarshallBinary deserializes binary b into the rawIndex entry.
+// unmarshalBinary deserializes binary b into the rawIndex entry.
func (i *indexEntry) unmarshalBinary(b []byte) error {
i.filenum = uint32(binary.BigEndian.Uint16(b[:2]))
i.offset = binary.BigEndian.Uint32(b[2:6])
return nil
}
-// marshallBinary serializes the rawIndex entry into binary.
-func (i *indexEntry) marshallBinary() []byte {
- b := make([]byte, indexEntrySize)
- binary.BigEndian.PutUint16(b[:2], uint16(i.filenum))
- binary.BigEndian.PutUint32(b[2:6], i.offset)
- return b
+// append adds the encoded entry to the end of b.
+func (i *indexEntry) append(b []byte) []byte {
+ offset := len(b)
+ out := append(b, make([]byte, indexEntrySize)...)
+ binary.BigEndian.PutUint16(out[offset:], uint16(i.filenum))
+ binary.BigEndian.PutUint32(out[offset+2:], i.offset)
+ return out
}
// bounds returns the start- and end- offsets, and the file number of where to
@@ -107,7 +109,7 @@ type freezerTable struct {
// to count how many historic items have gone missing.
itemOffset uint32 // Offset (number of discarded items)
- headBytes uint32 // Number of bytes written to the head file
+ headBytes int64 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables
@@ -118,12 +120,7 @@ type freezerTable struct {
// NewFreezerTable opens the given path as a freezer table.
func NewFreezerTable(path, name string, disableSnappy bool) (*freezerTable, error) {
- return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, disableSnappy)
-}
-
-// newTable opens a freezer table with default settings - 2G files
-func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, disableSnappy bool) (*freezerTable, error) {
- return newCustomTable(path, name, readMeter, writeMeter, sizeGauge, 2*1000*1000*1000, disableSnappy)
+ return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy)
}
// openFreezerFileForAppend opens a freezer table file and seeks to the end
@@ -164,10 +161,10 @@ func truncateFreezerFile(file *os.File, size int64) error {
return nil
}
-// newCustomTable opens a freezer table, creating the data and index files if they are
+// newTable opens a freezer table, creating the data and index files if they are
// non existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync.
-func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) {
+func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) {
// Ensure the containing directory exists and open the indexEntry file
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
@@ -313,7 +310,7 @@ func (t *freezerTable) repair() error {
}
// Update the item and byte counters and return
t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file
- t.headBytes = uint32(contentSize)
+ t.headBytes = contentSize
t.headId = lastIndex.filenum
// Close opened files and preopen all files
@@ -387,14 +384,14 @@ func (t *freezerTable) truncate(items uint64) error {
t.releaseFilesAfter(expected.filenum, true)
// Set back the historic head
t.head = newHead
- atomic.StoreUint32(&t.headId, expected.filenum)
+ t.headId = expected.filenum
}
if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil {
return err
}
// All data files truncated, set internal counters and return
+ t.headBytes = int64(expected.offset)
atomic.StoreUint64(&t.items, items)
- atomic.StoreUint32(&t.headBytes, expected.offset)
// Retrieve the new size and update the total size counter
newSize, err := t.sizeNolock()
@@ -471,94 +468,6 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
}
}
-// Append injects a binary blob at the end of the freezer table. The item number
-// is a precautionary parameter to ensure data correctness, but the table will
-// reject already existing data.
-//
-// Note, this method will *not* flush any data to disk so be sure to explicitly
-// fsync before irreversibly deleting data from the database.
-func (t *freezerTable) Append(item uint64, blob []byte) error {
- // Encode the blob before the lock portion
- if !t.noCompression {
- blob = snappy.Encode(nil, blob)
- }
- // Read lock prevents competition with truncate
- retry, err := t.append(item, blob, false)
- if err != nil {
- return err
- }
- if retry {
- // Read lock was insufficient, retry with a writelock
- _, err = t.append(item, blob, true)
- }
- return err
-}
-
-// append injects a binary blob at the end of the freezer table.
-// Normally, inserts do not require holding the write-lock, so it should be invoked with 'wlock' set to
-// false.
-// However, if the data will grown the current file out of bounds, then this
-// method will return 'true, nil', indicating that the caller should retry, this time
-// with 'wlock' set to true.
-func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool, error) {
- if wlock {
- t.lock.Lock()
- defer t.lock.Unlock()
- } else {
- t.lock.RLock()
- defer t.lock.RUnlock()
- }
- // Ensure the table is still accessible
- if t.index == nil || t.head == nil {
- return false, errClosed
- }
- // Ensure only the next item can be written, nothing else
- if atomic.LoadUint64(&t.items) != item {
- return false, fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item)
- }
- bLen := uint32(len(encodedBlob))
- if t.headBytes+bLen < bLen ||
- t.headBytes+bLen > t.maxFileSize {
- // Writing would overflow, so we need to open a new data file.
- // If we don't already hold the writelock, abort and let the caller
- // invoke this method a second time.
- if !wlock {
- return true, nil
- }
- nextID := atomic.LoadUint32(&t.headId) + 1
- // We open the next file in truncated mode -- if this file already
- // exists, we need to start over from scratch on it
- newHead, err := t.openFile(nextID, openFreezerFileTruncated)
- if err != nil {
- return false, err
- }
- // Close old file, and reopen in RDONLY mode
- t.releaseFile(t.headId)
- t.openFile(t.headId, openFreezerFileForReadOnly)
-
- // Swap out the current head
- t.head = newHead
- atomic.StoreUint32(&t.headBytes, 0)
- atomic.StoreUint32(&t.headId, nextID)
- }
- if _, err := t.head.Write(encodedBlob); err != nil {
- return false, err
- }
- newOffset := atomic.AddUint32(&t.headBytes, bLen)
- idx := indexEntry{
- filenum: atomic.LoadUint32(&t.headId),
- offset: newOffset,
- }
- // Write indexEntry
- t.index.Write(idx.marshallBinary())
-
- t.writeMeter.Mark(int64(bLen + indexEntrySize))
- t.sizeGauge.Inc(int64(bLen + indexEntrySize))
-
- atomic.AddUint64(&t.items, 1)
- return false, nil
-}
-
// getIndices returns the index entries for the given from-item, covering 'count' items.
// N.B: The actual number of returned indices for N items will always be N+1 (unless an
// error is returned).
@@ -651,6 +560,7 @@ func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, e
func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) {
t.lock.RLock()
defer t.lock.RUnlock()
+
// Ensure the table and the item is accessible
if t.index == nil || t.head == nil {
return nil, nil, errClosed
@@ -763,6 +673,32 @@ func (t *freezerTable) sizeNolock() (uint64, error) {
return total, nil
}
+// advanceHead should be called when the current head file would outgrow the file limits,
+// and a new file must be opened. The caller of this method must hold the write-lock
+// before calling this method.
+func (t *freezerTable) advanceHead() error {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ // We open the next file in truncated mode -- if this file already
+ // exists, we need to start over from scratch on it.
+ nextID := t.headId + 1
+ newHead, err := t.openFile(nextID, openFreezerFileTruncated)
+ if err != nil {
+ return err
+ }
+
+ // Close old file, and reopen in RDONLY mode.
+ t.releaseFile(t.headId)
+ t.openFile(t.headId, openFreezerFileForReadOnly)
+
+ // Swap out the current head.
+ t.head = newHead
+ t.headBytes = 0
+ t.headId = nextID
+ return nil
+}
+
// Sync pushes any pending data from memory out to disk. This is an expensive
// operation, so use it with care.
func (t *freezerTable) Sync() error {
@@ -775,10 +711,21 @@ func (t *freezerTable) Sync() error {
// DumpIndex is a debug print utility function, mainly for testing. It can also
// be used to analyse a live freezer table index.
func (t *freezerTable) DumpIndex(start, stop int64) {
+ t.dumpIndex(os.Stdout, start, stop)
+}
+
+func (t *freezerTable) dumpIndexString(start, stop int64) string {
+ var out bytes.Buffer
+ out.WriteString("\n")
+ t.dumpIndex(&out, start, stop)
+ return out.String()
+}
+
+func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) {
buf := make([]byte, indexEntrySize)
- fmt.Printf("| number | fileno | offset |\n")
- fmt.Printf("|--------|--------|--------|\n")
+ fmt.Fprintf(w, "| number | fileno | offset |\n")
+ fmt.Fprintf(w, "|--------|--------|--------|\n")
for i := uint64(start); ; i++ {
if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil {
@@ -786,10 +733,10 @@ func (t *freezerTable) DumpIndex(start, stop int64) {
}
var entry indexEntry
entry.unmarshalBinary(buf)
- fmt.Printf("| %03d | %03d | %03d | \n", i, entry.filenum, entry.offset)
+ fmt.Fprintf(w, "| %03d | %03d | %03d | \n", i, entry.filenum, entry.offset)
if stop > 0 && i >= uint64(stop) {
break
}
}
- fmt.Printf("|--------------------------|\n")
+ fmt.Fprintf(w, "|--------------------------|\n")
}
diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go
index e8a8b5c46..803809b52 100644
--- a/core/rawdb/freezer_table_test.go
+++ b/core/rawdb/freezer_table_test.go
@@ -18,49 +18,36 @@ package rawdb
import (
"bytes"
- "encoding/binary"
"fmt"
- "io/ioutil"
"math/rand"
"os"
"path/filepath"
- "sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/metrics"
+ "github.com/stretchr/testify/require"
)
func init() {
rand.Seed(time.Now().Unix())
}
-// Gets a chunk of data, filled with 'b'
-func getChunk(size int, b int) []byte {
- data := make([]byte, size)
- for i := range data {
- data[i] = byte(b)
- }
- return data
-}
-
// TestFreezerBasics test initializing a freezertable from scratch, writing to the table,
// and reading it back.
func TestFreezerBasics(t *testing.T) {
t.Parallel()
// set cutoff at 50 bytes
- f, err := newCustomTable(os.TempDir(),
+ f, err := newTable(os.TempDir(),
fmt.Sprintf("unittest-%d", rand.Uint64()),
metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true)
if err != nil {
t.Fatal(err)
}
defer f.Close()
+
// Write 15 bytes 255 times, results in 85 files
- for x := 0; x < 255; x++ {
- data := getChunk(15, x)
- f.Append(uint64(x), data)
- }
+ writeChunks(t, f, 255, 15)
//print(t, f, 0)
//print(t, f, 1)
@@ -98,16 +85,21 @@ func TestFreezerBasicsClosing(t *testing.T) {
f *freezerTable
err error
)
- f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
- // Write 15 bytes 255 times, results in 85 files
+
+ // Write 15 bytes 255 times, results in 85 files.
+ // In-between writes, the table is closed and re-opened.
for x := 0; x < 255; x++ {
data := getChunk(15, x)
- f.Append(uint64(x), data)
+ batch := f.newBatch()
+ require.NoError(t, batch.AppendRaw(uint64(x), data))
+ require.NoError(t, batch.commit())
f.Close()
- f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+
+ f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -124,7 +116,7 @@ func TestFreezerBasicsClosing(t *testing.T) {
t.Fatalf("test %d, got \n%x != \n%x", y, got, exp)
}
f.Close()
- f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -137,22 +129,22 @@ func TestFreezerRepairDanglingHead(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64())
- { // Fill table
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ // Fill table
+ {
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 255 times
- for x := 0; x < 255; x++ {
- data := getChunk(15, x)
- f.Append(uint64(x), data)
- }
+ writeChunks(t, f, 255, 15)
+
// The last item should be there
if _, err = f.Retrieve(0xfe); err != nil {
t.Fatal(err)
}
f.Close()
}
+
// open the index
idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644)
if err != nil {
@@ -165,9 +157,10 @@ func TestFreezerRepairDanglingHead(t *testing.T) {
}
idxFile.Truncate(stat.Size() - 4)
idxFile.Close()
+
// Now open it again
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -188,22 +181,22 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64())
- { // Fill a table and close it
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ // Fill a table and close it
+ {
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 255 times
- for x := 0; x < 0xff; x++ {
- data := getChunk(15, x)
- f.Append(uint64(x), data)
- }
+ writeChunks(t, f, 255, 15)
+
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
t.Fatal(err)
}
f.Close()
}
+
// open the index
idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644)
if err != nil {
@@ -213,9 +206,10 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
// 0-indexEntry, 1-indexEntry, corrupt-indexEntry
idxFile.Truncate(indexEntrySize + indexEntrySize + indexEntrySize/2)
idxFile.Close()
+
// Now open it again
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -228,15 +222,17 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
t.Errorf("Expected error for missing index entry")
}
// We should now be able to store items again, from item = 1
+ batch := f.newBatch()
for x := 1; x < 0xff; x++ {
- data := getChunk(15, ^x)
- f.Append(uint64(x), data)
+ require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x)))
}
+ require.NoError(t, batch.commit())
f.Close()
}
+
// And if we open it, we should now be able to read all of them (new values)
{
- f, _ := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, _ := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
for y := 1; y < 255; y++ {
exp := getChunk(15, ^y)
got, err := f.Retrieve(uint64(y))
@@ -255,22 +251,21 @@ func TestSnappyDetection(t *testing.T) {
t.Parallel()
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("snappytest-%d", rand.Uint64())
+
// Open with snappy
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 255 times
- for x := 0; x < 0xff; x++ {
- data := getChunk(15, x)
- f.Append(uint64(x), data)
- }
+ writeChunks(t, f, 255, 15)
f.Close()
}
+
// Open without snappy
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, false)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false)
if err != nil {
t.Fatal(err)
}
@@ -282,7 +277,7 @@ func TestSnappyDetection(t *testing.T) {
// Open with snappy
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -292,8 +287,8 @@ func TestSnappyDetection(t *testing.T) {
t.Fatalf("expected no error, got %v", err)
}
}
-
}
+
func assertFileSize(f string, size int64) error {
stat, err := os.Stat(f)
if err != nil {
@@ -303,7 +298,6 @@ func assertFileSize(f string, size int64) error {
return fmt.Errorf("error, expected size %d, got %d", size, stat.Size())
}
return nil
-
}
// TestFreezerRepairDanglingIndex checks that if the index has more entries than there are data,
@@ -313,16 +307,15 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64())
- { // Fill a table and close it
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ // Fill a table and close it
+ {
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 9 times : 150 bytes
- for x := 0; x < 9; x++ {
- data := getChunk(15, x)
- f.Append(uint64(x), data)
- }
+ writeChunks(t, f, 9, 15)
+
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
f.Close()
@@ -331,6 +324,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
f.Close()
// File sizes should be 45, 45, 45 : items[3, 3, 3)
}
+
// Crop third file
fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0002.rdat", fname))
// Truncate third file: 45 ,45, 20
@@ -345,17 +339,18 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
file.Truncate(20)
file.Close()
}
+
// Open db it again
// It should restore the file(s) to
// 45, 45, 15
// with 3+3+1 items
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
+ defer f.Close()
if f.items != 7 {
- f.Close()
t.Fatalf("expected %d items, got %d", 7, f.items)
}
if err := assertFileSize(fileToCrop, 15); err != nil {
@@ -365,30 +360,29 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
}
func TestFreezerTruncate(t *testing.T) {
-
t.Parallel()
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("truncation-%d", rand.Uint64())
- { // Fill table
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ // Fill table
+ {
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 30 times
- for x := 0; x < 30; x++ {
- data := getChunk(15, x)
- f.Append(uint64(x), data)
- }
+ writeChunks(t, f, 30, 15)
+
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
t.Fatal(err)
}
f.Close()
}
+
// Reopen, truncate
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -401,9 +395,7 @@ func TestFreezerTruncate(t *testing.T) {
if f.headBytes != 15 {
t.Fatalf("expected %d bytes, got %d", 15, f.headBytes)
}
-
}
-
}
// TestFreezerRepairFirstFile tests a head file with the very first item only half-written.
@@ -412,20 +404,26 @@ func TestFreezerRepairFirstFile(t *testing.T) {
t.Parallel()
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64())
- { // Fill table
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+
+ // Fill table
+ {
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 80 bytes, splitting out into two files
- f.Append(0, getChunk(40, 0xFF))
- f.Append(1, getChunk(40, 0xEE))
+ batch := f.newBatch()
+ require.NoError(t, batch.AppendRaw(0, getChunk(40, 0xFF)))
+ require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xEE)))
+ require.NoError(t, batch.commit())
+
// The last item should be there
- if _, err = f.Retrieve(f.items - 1); err != nil {
+ if _, err = f.Retrieve(1); err != nil {
t.Fatal(err)
}
f.Close()
}
+
// Truncate the file in half
fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0001.rdat", fname))
{
@@ -439,9 +437,10 @@ func TestFreezerRepairFirstFile(t *testing.T) {
file.Truncate(20)
file.Close()
}
+
// Reopen
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -449,9 +448,14 @@ func TestFreezerRepairFirstFile(t *testing.T) {
f.Close()
t.Fatalf("expected %d items, got %d", 0, f.items)
}
+
// Write 40 bytes
- f.Append(1, getChunk(40, 0xDD))
+ batch := f.newBatch()
+ require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xDD)))
+ require.NoError(t, batch.commit())
+
f.Close()
+
// Should have been truncated down to zero and then 40 written
if err := assertFileSize(fileToCrop, 40); err != nil {
t.Fatal(err)
@@ -468,25 +472,26 @@ func TestFreezerReadAndTruncate(t *testing.T) {
t.Parallel()
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("read_truncate-%d", rand.Uint64())
- { // Fill table
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+
+ // Fill table
+ {
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 30 times
- for x := 0; x < 30; x++ {
- data := getChunk(15, x)
- f.Append(uint64(x), data)
- }
+ writeChunks(t, f, 30, 15)
+
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
t.Fatal(err)
}
f.Close()
}
+
// Reopen and read all files
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -497,40 +502,48 @@ func TestFreezerReadAndTruncate(t *testing.T) {
for y := byte(0); y < 30; y++ {
f.Retrieve(uint64(y))
}
+
// Now, truncate back to zero
f.truncate(0)
+
// Write the data again
+ batch := f.newBatch()
for x := 0; x < 30; x++ {
- data := getChunk(15, ^x)
- if err := f.Append(uint64(x), data); err != nil {
- t.Fatalf("error %v", err)
- }
+ require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x)))
}
+ require.NoError(t, batch.commit())
f.Close()
}
}
-func TestOffset(t *testing.T) {
+func TestFreezerOffset(t *testing.T) {
t.Parallel()
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("offset-%d", rand.Uint64())
- { // Fill table
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true)
+
+ // Fill table
+ {
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true)
if err != nil {
t.Fatal(err)
}
+
// Write 6 x 20 bytes, splitting out into three files
- f.Append(0, getChunk(20, 0xFF))
- f.Append(1, getChunk(20, 0xEE))
+ batch := f.newBatch()
+ require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
+ require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
- f.Append(2, getChunk(20, 0xdd))
- f.Append(3, getChunk(20, 0xcc))
+ require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
+ require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc)))
- f.Append(4, getChunk(20, 0xbb))
- f.Append(5, getChunk(20, 0xaa))
- f.DumpIndex(0, 100)
+ require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
+ require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
+ require.NoError(t, batch.commit())
+
+ t.Log(f.dumpIndexString(0, 100))
f.Close()
}
+
// Now crop it.
{
// delete files 0 and 1
@@ -558,7 +571,7 @@ func TestOffset(t *testing.T) {
filenum: tailId,
offset: itemOffset,
}
- buf := zeroIndex.marshallBinary()
+ buf := zeroIndex.append(nil)
// Overwrite index zero
copy(indexBuf, buf)
// Remove the four next indices by overwriting
@@ -567,44 +580,36 @@ func TestOffset(t *testing.T) {
// Need to truncate the moved index items
indexFile.Truncate(indexEntrySize * (1 + 2))
indexFile.Close()
-
}
+
// Now open again
- checkPresent := func(numDeleted uint64) {
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true)
+ {
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true)
if err != nil {
t.Fatal(err)
}
- f.DumpIndex(0, 100)
- // It should allow writing item 6
- f.Append(numDeleted+2, getChunk(20, 0x99))
+ defer f.Close()
+ t.Log(f.dumpIndexString(0, 100))
- // It should be fine to fetch 4,5,6
- if got, err := f.Retrieve(numDeleted); err != nil {
- t.Fatal(err)
- } else if exp := getChunk(20, 0xbb); !bytes.Equal(got, exp) {
- t.Fatalf("expected %x got %x", exp, got)
- }
- if got, err := f.Retrieve(numDeleted + 1); err != nil {
- t.Fatal(err)
- } else if exp := getChunk(20, 0xaa); !bytes.Equal(got, exp) {
- t.Fatalf("expected %x got %x", exp, got)
- }
- if got, err := f.Retrieve(numDeleted + 2); err != nil {
- t.Fatal(err)
- } else if exp := getChunk(20, 0x99); !bytes.Equal(got, exp) {
- t.Fatalf("expected %x got %x", exp, got)
- }
+ // It should allow writing item 6.
+ batch := f.newBatch()
+ require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99)))
+ require.NoError(t, batch.commit())
- // It should error at 0, 1,2,3
- for i := numDeleted - 1; i > numDeleted-10; i-- {
- if _, err := f.Retrieve(i); err == nil {
- t.Fatal("expected err")
- }
- }
+ checkRetrieveError(t, f, map[uint64]error{
+ 0: errOutOfBounds,
+ 1: errOutOfBounds,
+ 2: errOutOfBounds,
+ 3: errOutOfBounds,
+ })
+ checkRetrieve(t, f, map[uint64][]byte{
+ 4: getChunk(20, 0xbb),
+ 5: getChunk(20, 0xaa),
+ 6: getChunk(20, 0x99),
+ })
}
- checkPresent(4)
- // Now, let's pretend we have deleted 1M items
+
+ // Edit the index again, with a much larger initial offset of 1M.
{
// Read the index file
p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname))
@@ -624,13 +629,71 @@ func TestOffset(t *testing.T) {
offset: itemOffset,
filenum: tailId,
}
- buf := zeroIndex.marshallBinary()
+ buf := zeroIndex.append(nil)
// Overwrite index zero
copy(indexBuf, buf)
indexFile.WriteAt(indexBuf, 0)
indexFile.Close()
}
- checkPresent(1000000)
+
+ // Check that existing items have been moved to index 1M.
+ {
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer f.Close()
+ t.Log(f.dumpIndexString(0, 100))
+
+ checkRetrieveError(t, f, map[uint64]error{
+ 0: errOutOfBounds,
+ 1: errOutOfBounds,
+ 2: errOutOfBounds,
+ 3: errOutOfBounds,
+ 999999: errOutOfBounds,
+ })
+ checkRetrieve(t, f, map[uint64][]byte{
+ 1000000: getChunk(20, 0xbb),
+ 1000001: getChunk(20, 0xaa),
+ })
+ }
+}
+
+func checkRetrieve(t *testing.T, f *freezerTable, items map[uint64][]byte) {
+ t.Helper()
+
+ for item, wantBytes := range items {
+ value, err := f.Retrieve(item)
+ if err != nil {
+ t.Fatalf("can't get expected item %d: %v", item, err)
+ }
+ if !bytes.Equal(value, wantBytes) {
+ t.Fatalf("item %d has wrong value %x (want %x)", item, value, wantBytes)
+ }
+ }
+}
+
+func checkRetrieveError(t *testing.T, f *freezerTable, items map[uint64]error) {
+ t.Helper()
+
+ for item, wantError := range items {
+ value, err := f.Retrieve(item)
+ if err == nil {
+ t.Fatalf("unexpected value %x for item %d, want error %v", item, value, wantError)
+ }
+ if err != wantError {
+ t.Fatalf("wrong error for item %d: %v", item, err)
+ }
+ }
+}
+
+// Gets a chunk of data, filled with 'b'
+func getChunk(size int, b int) []byte {
+ data := make([]byte, size)
+ for i := range data {
+ data[i] = byte(b)
+ }
+ return data
}
// TODO (?)
@@ -644,53 +707,18 @@ func TestOffset(t *testing.T) {
// should be handled already, and the case described above can only (?) happen if an
// external process/user deletes files from the filesystem.
-// TestAppendTruncateParallel is a test to check if the Append/truncate operations are
-// racy.
-//
-// The reason why it's not a regular fuzzer, within tests/fuzzers, is that it is dependent
-// on timing rather than 'clever' input -- there's no determinism.
-func TestAppendTruncateParallel(t *testing.T) {
- dir, err := ioutil.TempDir("", "freezer")
- if err != nil {
- t.Fatal(err)
- }
- defer os.RemoveAll(dir)
+func writeChunks(t *testing.T, ft *freezerTable, n int, length int) {
+ t.Helper()
- f, err := newCustomTable(dir, "tmp", metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, 8, true)
- if err != nil {
- t.Fatal(err)
- }
-
- fill := func(mark uint64) []byte {
- data := make([]byte, 8)
- binary.LittleEndian.PutUint64(data, mark)
- return data
- }
-
- for i := 0; i < 5000; i++ {
- f.truncate(0)
- data0 := fill(0)
- f.Append(0, data0)
- data1 := fill(1)
-
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- f.truncate(0)
- wg.Done()
- }()
- go func() {
- f.Append(1, data1)
- wg.Done()
- }()
- wg.Wait()
-
- if have, err := f.Retrieve(0); err == nil {
- if !bytes.Equal(have, data0) {
- t.Fatalf("have %x want %x", have, data0)
- }
+ batch := ft.newBatch()
+ for i := 0; i < n; i++ {
+ if err := batch.AppendRaw(uint64(i), getChunk(length, i)); err != nil {
+ t.Fatalf("AppendRaw(%d, ...) returned error: %v", i, err)
}
}
+ if err := batch.commit(); err != nil {
+ t.Fatalf("Commit returned error: %v", err)
+ }
}
// TestSequentialRead does some basic tests on the RetrieveItems.
@@ -698,20 +726,17 @@ func TestSequentialRead(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("batchread-%d", rand.Uint64())
{ // Fill table
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 30 times
- for x := 0; x < 30; x++ {
- data := getChunk(15, x)
- f.Append(uint64(x), data)
- }
+ writeChunks(t, f, 30, 15)
f.DumpIndex(0, 30)
f.Close()
}
{ // Open it, iterate, verify iteration
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -732,7 +757,7 @@ func TestSequentialRead(t *testing.T) {
}
{ // Open it, iterate, verify byte limit. The byte limit is less than item
// size, so each lookup should only return one item
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true)
if err != nil {
t.Fatal(err)
}
@@ -761,16 +786,13 @@ func TestSequentialReadByteLimit(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("batchread-2-%d", rand.Uint64())
{ // Fill table
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true)
if err != nil {
t.Fatal(err)
}
// Write 10 bytes 30 times,
// Splitting it at every 100 bytes (10 items)
- for x := 0; x < 30; x++ {
- data := getChunk(10, x)
- f.Append(uint64(x), data)
- }
+ writeChunks(t, f, 30, 10)
f.Close()
}
for i, tc := range []struct {
@@ -786,7 +808,7 @@ func TestSequentialReadByteLimit(t *testing.T) {
{100, 109, 10},
} {
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true)
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true)
if err != nil {
t.Fatal(err)
}
diff --git a/core/rawdb/freezer_test.go b/core/rawdb/freezer_test.go
new file mode 100644
index 000000000..7359131c8
--- /dev/null
+++ b/core/rawdb/freezer_test.go
@@ -0,0 +1,301 @@
+// Copyright 2021 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package rawdb
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "math/big"
+ "math/rand"
+ "os"
+ "sync"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/stretchr/testify/require"
+)
+
+var freezerTestTableDef = map[string]bool{"test": true}
+
+func TestFreezerModify(t *testing.T) {
+ t.Parallel()
+
+ // Create test data.
+ var valuesRaw [][]byte
+ var valuesRLP []*big.Int
+ for x := 0; x < 100; x++ {
+ v := getChunk(256, x)
+ valuesRaw = append(valuesRaw, v)
+ iv := big.NewInt(int64(x))
+ iv = iv.Exp(iv, iv, nil)
+ valuesRLP = append(valuesRLP, iv)
+ }
+
+ tables := map[string]bool{"raw": true, "rlp": false}
+ f, dir := newFreezerForTesting(t, tables)
+ defer os.RemoveAll(dir)
+ defer f.Close()
+
+ // Commit test data.
+ _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
+ for i := range valuesRaw {
+ if err := op.AppendRaw("raw", uint64(i), valuesRaw[i]); err != nil {
+ return err
+ }
+ if err := op.Append("rlp", uint64(i), valuesRLP[i]); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ t.Fatal("ModifyAncients failed:", err)
+ }
+
+ // Dump indexes.
+ for _, table := range f.tables {
+ t.Log(table.name, "index:", table.dumpIndexString(0, int64(len(valuesRaw))))
+ }
+
+ // Read back test data.
+ checkAncientCount(t, f, "raw", uint64(len(valuesRaw)))
+ checkAncientCount(t, f, "rlp", uint64(len(valuesRLP)))
+ for i := range valuesRaw {
+ v, _ := f.Ancient("raw", uint64(i))
+ if !bytes.Equal(v, valuesRaw[i]) {
+ t.Fatalf("wrong raw value at %d: %x", i, v)
+ }
+ ivEnc, _ := f.Ancient("rlp", uint64(i))
+ want, _ := rlp.EncodeToBytes(valuesRLP[i])
+ if !bytes.Equal(ivEnc, want) {
+ t.Fatalf("wrong RLP value at %d: %x", i, ivEnc)
+ }
+ }
+}
+
+// This checks that ModifyAncients rolls back freezer updates
+// when the function passed to it returns an error.
+func TestFreezerModifyRollback(t *testing.T) {
+ t.Parallel()
+
+ f, dir := newFreezerForTesting(t, freezerTestTableDef)
+ defer os.RemoveAll(dir)
+
+ theError := errors.New("oops")
+ _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
+ // Append three items. This creates two files immediately,
+ // because the table size limit of the test freezer is 2048.
+ require.NoError(t, op.AppendRaw("test", 0, make([]byte, 2048)))
+ require.NoError(t, op.AppendRaw("test", 1, make([]byte, 2048)))
+ require.NoError(t, op.AppendRaw("test", 2, make([]byte, 2048)))
+ return theError
+ })
+ if err != theError {
+ t.Errorf("ModifyAncients returned wrong error %q", err)
+ }
+ checkAncientCount(t, f, "test", 0)
+ f.Close()
+
+ // Reopen and check that the rolled-back data doesn't reappear.
+ tables := map[string]bool{"test": true}
+ f2, err := newFreezer(dir, "", false, 2049, tables)
+ if err != nil {
+ t.Fatalf("can't reopen freezer after failed ModifyAncients: %v", err)
+ }
+ defer f2.Close()
+ checkAncientCount(t, f2, "test", 0)
+}
+
+// This test runs ModifyAncients and Ancient concurrently with each other.
+func TestFreezerConcurrentModifyRetrieve(t *testing.T) {
+ t.Parallel()
+
+ f, dir := newFreezerForTesting(t, freezerTestTableDef)
+ defer os.RemoveAll(dir)
+ defer f.Close()
+
+ var (
+ numReaders = 5
+ writeBatchSize = uint64(50)
+ written = make(chan uint64, numReaders*6)
+ wg sync.WaitGroup
+ )
+ wg.Add(numReaders + 1)
+
+ // Launch the writer. It appends 10000 items in batches.
+ go func() {
+ defer wg.Done()
+ defer close(written)
+ for item := uint64(0); item < 10000; item += writeBatchSize {
+ _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
+ for i := uint64(0); i < writeBatchSize; i++ {
+ item := item + i
+ value := getChunk(32, int(item))
+ if err := op.AppendRaw("test", item, value); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ panic(err)
+ }
+ for i := 0; i < numReaders; i++ {
+ written <- item + writeBatchSize
+ }
+ }
+ }()
+
+ // Launch the readers. They read random items from the freezer up to the
+ // current frozen item count.
+ for i := 0; i < numReaders; i++ {
+ go func() {
+ defer wg.Done()
+ for frozen := range written {
+ for rc := 0; rc < 80; rc++ {
+ num := uint64(rand.Intn(int(frozen)))
+ value, err := f.Ancient("test", num)
+ if err != nil {
+ panic(fmt.Errorf("error reading %d (frozen %d): %v", num, frozen, err))
+ }
+ if !bytes.Equal(value, getChunk(32, int(num))) {
+ panic(fmt.Errorf("wrong value at %d", num))
+ }
+ }
+ }
+ }()
+ }
+
+ wg.Wait()
+}
+
+// This test runs ModifyAncients and TruncateAncients concurrently with each other.
+func TestFreezerConcurrentModifyTruncate(t *testing.T) {
+ f, dir := newFreezerForTesting(t, freezerTestTableDef)
+ defer os.RemoveAll(dir)
+ defer f.Close()
+
+ var item = make([]byte, 256)
+
+ for i := 0; i < 1000; i++ {
+ // First reset and write 100 items.
+ if err := f.TruncateAncients(0); err != nil {
+ t.Fatal("truncate failed:", err)
+ }
+ _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
+ for i := uint64(0); i < 100; i++ {
+ if err := op.AppendRaw("test", i, item); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ t.Fatal("modify failed:", err)
+ }
+ checkAncientCount(t, f, "test", 100)
+
+ // Now append 100 more items and truncate concurrently.
+ var (
+ wg sync.WaitGroup
+ truncateErr error
+ modifyErr error
+ )
+ wg.Add(3)
+ go func() {
+ _, modifyErr = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
+ for i := uint64(100); i < 200; i++ {
+ if err := op.AppendRaw("test", i, item); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ wg.Done()
+ }()
+ go func() {
+ truncateErr = f.TruncateAncients(10)
+ wg.Done()
+ }()
+ go func() {
+ f.AncientSize("test")
+ wg.Done()
+ }()
+ wg.Wait()
+
+ // Now check the outcome. If the truncate operation went through first, the append
+ // fails, otherwise it succeeds. In either case, the freezer should be positioned
+ // at 10 after both operations are done.
+ if truncateErr != nil {
+ t.Fatal("concurrent truncate failed:", err)
+ }
+ if !(modifyErr == nil || modifyErr == errOutOrderInsertion) {
+ t.Fatal("wrong error from concurrent modify:", modifyErr)
+ }
+ checkAncientCount(t, f, "test", 10)
+ }
+}
+
+func newFreezerForTesting(t *testing.T, tables map[string]bool) (*freezer, string) {
+ t.Helper()
+
+ dir, err := ioutil.TempDir("", "freezer")
+ if err != nil {
+ t.Fatal(err)
+ }
+ // note: using low max table size here to ensure the tests actually
+ // switch between multiple files.
+ f, err := newFreezer(dir, "", false, 2049, tables)
+ if err != nil {
+ t.Fatal("can't open freezer", err)
+ }
+ return f, dir
+}
+
+// checkAncientCount verifies that the freezer contains n items.
+func checkAncientCount(t *testing.T, f *freezer, kind string, n uint64) {
+ t.Helper()
+
+ if frozen, _ := f.Ancients(); frozen != n {
+ t.Fatalf("Ancients() returned %d, want %d", frozen, n)
+ }
+
+ // Check at index n-1.
+ if n > 0 {
+ index := n - 1
+ if ok, _ := f.HasAncient(kind, index); !ok {
+ t.Errorf("HasAncient(%q, %d) returned false unexpectedly", kind, index)
+ }
+ if _, err := f.Ancient(kind, index); err != nil {
+ t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err)
+ }
+ }
+
+ // Check at index n.
+ index := n
+ if ok, _ := f.HasAncient(kind, index); ok {
+ t.Errorf("HasAncient(%q, %d) returned true unexpectedly", kind, index)
+ }
+ if _, err := f.Ancient(kind, index); err == nil {
+ t.Errorf("Ancient(%q, %d) didn't return expected error", kind, index)
+ } else if err != errOutOfBounds {
+ t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err)
+ }
+}
diff --git a/core/rawdb/table.go b/core/rawdb/table.go
index 586451c06..02e23b517 100644
--- a/core/rawdb/table.go
+++ b/core/rawdb/table.go
@@ -80,10 +80,9 @@ func (t *table) AncientSize(kind string) (uint64, error) {
return t.db.AncientSize(kind)
}
-// AppendAncient is a noop passthrough that just forwards the request to the underlying
-// database.
-func (t *table) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error {
- return t.db.AppendAncient(number, hash, header, body, receipts, td)
+// ModifyAncients runs an ancient write operation on the underlying database.
+func (t *table) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (int64, error) {
+ return t.db.ModifyAncients(fn)
}
// TruncateAncients is a noop passthrough that just forwards the request to the underlying
diff --git a/ethdb/database.go b/ethdb/database.go
index bdc09d5e9..3c6500d1d 100644
--- a/ethdb/database.go
+++ b/ethdb/database.go
@@ -92,9 +92,10 @@ type AncientReader interface {
// AncientWriter contains the methods required to write to immutable ancient data.
type AncientWriter interface {
- // AppendAncient injects all binary blobs belong to block at the end of the
- // append-only immutable table files.
- AppendAncient(number uint64, hash, header, body, receipt, td []byte) error
+ // ModifyAncients runs a write operation on the ancient store.
+ // If the function returns an error, any changes to the underlying store are reverted.
+ // The integer return value is the total size of the written data.
+ ModifyAncients(func(AncientWriteOp) error) (int64, error)
// TruncateAncients discards all but the first n ancient data from the ancient store.
TruncateAncients(n uint64) error
@@ -103,6 +104,15 @@ type AncientWriter interface {
Sync() error
}
+// AncientWriteOp is given to the function argument of ModifyAncients.
+type AncientWriteOp interface {
+ // Append adds an RLP-encoded item.
+ Append(kind string, number uint64, item interface{}) error
+
+ // AppendRaw adds an item without RLP-encoding it.
+ AppendRaw(kind string, number uint64, item []byte) error
+}
+
// Reader contains the methods required to read data from both key-value as well as
// immutable ancient data.
type Reader interface {