Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
13 changed files with 1349 additions and 590 deletions
Showing only changes of commit 794c6133ef - Show all commits

View File

@ -208,7 +208,6 @@ type BlockChain struct {
vmConfig vm.Config
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
}
// NewBlockChain returns a fully initialised block chain using information
@ -1085,38 +1084,6 @@ const (
SideStatTy
)
// truncateAncient rewinds the blockchain to the specified header and deletes all
// data in the ancient store that exceeds the specified header.
func (bc *BlockChain) truncateAncient(head uint64) error {
frozen, err := bc.db.Ancients()
if err != nil {
return err
}
// Short circuit if there is no data to truncate in ancient store.
if frozen <= head+1 {
return nil
}
// Truncate all the data in the freezer beyond the specified head
if err := bc.db.TruncateAncients(head + 1); err != nil {
return err
}
// Clear out any stale content from the caches
bc.hc.headerCache.Purge()
bc.hc.tdCache.Purge()
bc.hc.numberCache.Purge()
// Clear out any stale content from the caches
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
bc.receiptsCache.Purge()
bc.blockCache.Purge()
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()
log.Info("Rewind ancient data", "number", head)
return nil
}
// numberHash is just a container for a number and a hash, to represent a block
type numberHash struct {
number uint64
@ -1155,12 +1122,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
var (
stats = struct{ processed, ignored int32 }{}
start = time.Now()
size = 0
size = int64(0)
)
// updateHead updates the head fast sync block if the inserted blocks are better
// and returns an indicator whether the inserted blocks are canonical.
updateHead := func(head *types.Block) bool {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
// Rewind may have occurred, skip in that case.
if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 {
@ -1169,56 +1138,49 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentFastBlock.Store(head)
headFastBlockGauge.Update(int64(head.NumberU64()))
bc.chainmu.Unlock()
return true
}
}
bc.chainmu.Unlock()
return false
}
// writeAncient writes blockchain and corresponding receipt chain into ancient store.
//
// this function only accepts canonical chain data. All side chain will be reverted
// eventually.
writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
var (
previous = bc.CurrentFastBlock()
batch = bc.db.NewBatch()
)
// If any error occurs before updating the head or we are inserting a side chain,
// all the data written this time wll be rolled back.
defer func() {
if previous != nil {
if err := bc.truncateAncient(previous.NumberU64()); err != nil {
log.Crit("Truncate ancient store failed", "err", err)
}
}
}()
var deleted []*numberHash
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
if bc.insertStopped() {
return 0, errInsertionInterrupted
}
// Short circuit insertion if it is required(used in testing only)
if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) {
return i, errors.New("insertion is terminated for testing purpose")
}
// Short circuit if the owner header is unknown
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
return i, fmt.Errorf("containing header #%d [%x..] unknown", block.Number(), block.Hash().Bytes()[:4])
}
if block.NumberU64() == 1 {
// Make sure to write the genesis into the freezer
first := blockChain[0]
last := blockChain[len(blockChain)-1]
// Ensure genesis is in ancients.
if first.NumberU64() == 1 {
if frozen, _ := bc.db.Ancients(); frozen == 0 {
h := rawdb.ReadCanonicalHash(bc.db, 0)
b := rawdb.ReadBlock(bc.db, h, 0)
size += rawdb.WriteAncientBlock(bc.db, b, rawdb.ReadReceipts(bc.db, h, 0, bc.chainConfig), rawdb.ReadTd(bc.db, h, 0))
b := bc.genesisBlock
td := bc.genesisBlock.Difficulty()
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, td)
size += writeSize
if err != nil {
log.Error("Error writing genesis to ancients", "err", err)
return 0, err
}
log.Info("Wrote genesis to ancients")
}
}
// Flush data into ancient database.
size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64()))
// Before writing the blocks to the ancients, we need to ensure that
// they correspond to the what the headerchain 'expects'.
// We only check the last block/header, since it's a contiguous chain.
if !bc.HasHeader(last.Hash(), last.NumberU64()) {
return 0, fmt.Errorf("containing header #%d [%x..] unknown", last.Number(), last.Hash().Bytes()[:4])
}
// Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64())
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
size += writeSize
if err != nil {
log.Error("Error importing chain data to ancients", "err", err)
return 0, err
}
// Write tx indices if any condition is satisfied:
// * If user requires to reserve all tx indices(txlookuplimit=0)
@ -1231,6 +1193,8 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
// range. In this case, all tx indices of newly imported blocks should be
// generated.
var batch = bc.db.NewBatch()
for _, block := range blockChain {
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
} else if rawdb.ReadTxIndexTail(bc.db) != nil {
@ -1238,51 +1202,50 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
stats.processed++
}
// Flush all tx-lookup index data.
size += batch.ValueSize()
size += int64(batch.ValueSize())
if err := batch.Write(); err != nil {
// The tx index data could not be written.
// Roll back the ancient store update.
fastBlock := bc.CurrentFastBlock().NumberU64()
if err := bc.db.TruncateAncients(fastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
}
batch.Reset()
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil {
return 0, err
}
// Update the current fast block because all block data is now present in DB.
previousFastBlock := bc.CurrentFastBlock().NumberU64()
if !updateHead(blockChain[len(blockChain)-1]) {
return 0, errors.New("side blocks can't be accepted as the ancient chain data")
// We end up here if the header chain has reorg'ed, and the blocks/receipts
// don't match the canonical chain.
if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, errSideChainReceipts
}
previous = nil // disable rollback explicitly
// Wipe out canonical block data.
for _, nh := range deleted {
rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number)
rawdb.DeleteCanonicalHash(batch, nh.number)
}
for _, block := range blockChain {
// Always keep genesis block in active database.
if block.NumberU64() != 0 {
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
rawdb.DeleteCanonicalHash(batch, block.NumberU64())
}
}
if err := batch.Write(); err != nil {
return 0, err
}
// Delete block data from the main database.
batch.Reset()
// Wipe out side chain too.
for _, nh := range deleted {
for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) {
rawdb.DeleteBlock(batch, hash, nh.number)
}
}
canonHashes := make(map[common.Hash]struct{})
for _, block := range blockChain {
// Always keep genesis block in active database.
if block.NumberU64() != 0 {
for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) {
rawdb.DeleteBlock(batch, hash, block.NumberU64())
canonHashes[block.Hash()] = struct{}{}
if block.NumberU64() == 0 {
continue
}
rawdb.DeleteCanonicalHash(batch, block.NumberU64())
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
}
// Delete side chain hash-to-number mappings.
for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) {
if _, canon := canonHashes[nh.Hash]; !canon {
rawdb.DeleteHeader(batch, nh.Hash, nh.Number)
}
}
if err := batch.Write(); err != nil {
@ -1290,6 +1253,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
return 0, nil
}
// writeLive writes blockchain and corresponding receipt chain into active store.
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
skipPresenceCheck := false
@ -1327,7 +1291,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if err := batch.Write(); err != nil {
return 0, err
}
size += batch.ValueSize()
size += int64(batch.ValueSize())
batch.Reset()
}
stats.processed++
@ -1336,7 +1300,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// we can ensure all components of body is completed(body, receipts,
// tx indexes)
if batch.ValueSize() > 0 {
size += batch.ValueSize()
size += int64(batch.ValueSize())
if err := batch.Write(); err != nil {
return 0, err
}
@ -1344,6 +1308,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
updateHead(blockChain[len(blockChain)-1])
return 0, nil
}
// Write downloaded chain data and corresponding receipt chain data
if len(ancientBlocks) > 0 {
if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {

View File

@ -670,6 +670,7 @@ func TestFastVsFullChains(t *testing.T) {
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
// Iterate over all chain data components, and cross reference
for i := 0; i < len(blocks); i++ {
num, hash := blocks[i].NumberU64(), blocks[i].Hash()
@ -693,10 +694,27 @@ func TestFastVsFullChains(t *testing.T) {
} else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) || types.CalcUncleHash(anblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) {
t.Errorf("block #%d [%x]: uncles mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Uncles(), anblock, arblock.Uncles())
}
if freceipts, anreceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(ancientDb, hash, *rawdb.ReadHeaderNumber(ancientDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), fast.Config()); types.DeriveSha(freceipts, trie.NewStackTrie(nil)) != types.DeriveSha(areceipts, trie.NewStackTrie(nil)) {
// Check receipts.
freceipts := rawdb.ReadReceipts(fastDb, hash, num, fast.Config())
anreceipts := rawdb.ReadReceipts(ancientDb, hash, num, fast.Config())
areceipts := rawdb.ReadReceipts(archiveDb, hash, num, fast.Config())
if types.DeriveSha(freceipts, trie.NewStackTrie(nil)) != types.DeriveSha(areceipts, trie.NewStackTrie(nil)) {
t.Errorf("block #%d [%x]: receipts mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, freceipts, anreceipts, areceipts)
}
// Check that hash-to-number mappings are present in all databases.
if m := rawdb.ReadHeaderNumber(fastDb, hash); m == nil || *m != num {
t.Errorf("block #%d [%x]: wrong hash-to-number mapping in fastdb: %v", num, hash, m)
}
if m := rawdb.ReadHeaderNumber(ancientDb, hash); m == nil || *m != num {
t.Errorf("block #%d [%x]: wrong hash-to-number mapping in ancientdb: %v", num, hash, m)
}
if m := rawdb.ReadHeaderNumber(archiveDb, hash); m == nil || *m != num {
t.Errorf("block #%d [%x]: wrong hash-to-number mapping in archivedb: %v", num, hash, m)
}
}
// Check that the canonical chains are the same between the databases
for i := 0; i < len(blocks)+1; i++ {
if fhash, ahash := rawdb.ReadCanonicalHash(fastDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); fhash != ahash {
@ -1639,20 +1657,34 @@ func TestBlockchainRecovery(t *testing.T) {
}
}
func TestIncompleteAncientReceiptChainInsertion(t *testing.T) {
// Configure and generate a sample block chain
var (
gendb = rawdb.NewMemoryDatabase()
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
funds = big.NewInt(1000000000)
gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}}
genesis = gspec.MustCommit(gendb)
)
height := uint64(1024)
blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil)
// This test checks that InsertReceiptChain will roll back correctly when attempting to insert a side chain.
func TestInsertReceiptChainRollback(t *testing.T) {
// Generate forked chain. The returned BlockChain object is used to process the side chain blocks.
tmpChain, sideblocks, canonblocks, err := getLongAndShortChains()
if err != nil {
t.Fatal(err)
}
defer tmpChain.Stop()
// Get the side chain receipts.
if _, err := tmpChain.InsertChain(sideblocks); err != nil {
t.Fatal("processing side chain failed:", err)
}
t.Log("sidechain head:", tmpChain.CurrentBlock().Number(), tmpChain.CurrentBlock().Hash())
sidechainReceipts := make([]types.Receipts, len(sideblocks))
for i, block := range sideblocks {
sidechainReceipts[i] = tmpChain.GetReceiptsByHash(block.Hash())
}
// Get the canon chain receipts.
if _, err := tmpChain.InsertChain(canonblocks); err != nil {
t.Fatal("processing canon chain failed:", err)
}
t.Log("canon head:", tmpChain.CurrentBlock().Number(), tmpChain.CurrentBlock().Hash())
canonReceipts := make([]types.Receipts, len(canonblocks))
for i, block := range canonblocks {
canonReceipts[i] = tmpChain.GetReceiptsByHash(block.Hash())
}
// Import the chain as a ancient-first node and ensure all pointers are updated
// Set up a BlockChain that uses the ancient store.
frdir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
@ -1662,38 +1694,43 @@ func TestIncompleteAncientReceiptChainInsertion(t *testing.T) {
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
gspec := Genesis{Config: params.AllEthashProtocolChanges}
gspec.MustCommit(ancientDb)
ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil)
defer ancient.Stop()
ancientChain, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil)
defer ancientChain.Stop()
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
// Import the canonical header chain.
canonHeaders := make([]*types.Header, len(canonblocks))
for i, block := range canonblocks {
canonHeaders[i] = block.Header()
}
if n, err := ancient.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
if _, err = ancientChain.InsertHeaderChain(canonHeaders, 1); err != nil {
t.Fatal("can't import canon headers:", err)
}
// Abort ancient receipt chain insertion deliberately
ancient.terminateInsert = func(hash common.Hash, number uint64) bool {
return number == blocks[len(blocks)/2].NumberU64()
// Try to insert blocks/receipts of the side chain.
_, err = ancientChain.InsertReceiptChain(sideblocks, sidechainReceipts, uint64(len(sideblocks)))
if err == nil {
t.Fatal("expected error from InsertReceiptChain.")
}
previousFastBlock := ancient.CurrentFastBlock()
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
if ancientChain.CurrentFastBlock().NumberU64() != 0 {
t.Fatalf("failed to rollback ancient data, want %d, have %d", 0, ancientChain.CurrentFastBlock().NumberU64())
}
if ancient.CurrentFastBlock().NumberU64() != previousFastBlock.NumberU64() {
t.Fatalf("failed to rollback ancient data, want %d, have %d", previousFastBlock.NumberU64(), ancient.CurrentFastBlock().NumberU64())
if frozen, err := ancientChain.db.Ancients(); err != nil || frozen != 1 {
t.Fatalf("failed to truncate ancient data, frozen index is %d", frozen)
}
if frozen, err := ancient.db.Ancients(); err != nil || frozen != 1 {
t.Fatalf("failed to truncate ancient data")
// Insert blocks/receipts of the canonical chain.
_, err = ancientChain.InsertReceiptChain(canonblocks, canonReceipts, uint64(len(canonblocks)))
if err != nil {
t.Fatalf("can't import canon chain receipts: %v", err)
}
ancient.terminateInsert = nil
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
if ancient.CurrentFastBlock().NumberU64() != blocks[len(blocks)-1].NumberU64() {
if ancientChain.CurrentFastBlock().NumberU64() != canonblocks[len(canonblocks)-1].NumberU64() {
t.Fatalf("failed to insert ancient recept chain after rollback")
}
if frozen, _ := ancientChain.db.Ancients(); frozen != uint64(len(canonblocks))+1 {
t.Fatalf("wrong ancients count %d", frozen)
}
}
// Tests that importing a very large side fork, which is larger than the canon chain,
@ -1958,9 +1995,8 @@ func testInsertKnownChainData(t *testing.T, typ string) {
asserter(t, blocks2[len(blocks2)-1])
}
// getLongAndShortChains returns two chains,
// A is longer, B is heavier
func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error) {
// getLongAndShortChains returns two chains: A is longer, B is heavier.
func getLongAndShortChains() (bc *BlockChain, longChain []*types.Block, heavyChain []*types.Block, err error) {
// Generate a canonical chain to act as the main dataset
engine := ethash.NewFaker()
db := rawdb.NewMemoryDatabase()
@ -1968,7 +2004,7 @@ func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error
// Generate and import the canonical chain,
// Offset the time, to keep the difficulty low
longChain, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 80, func(i int, b *BlockGen) {
longChain, _ = GenerateChain(params.TestChainConfig, genesis, engine, db, 80, func(i int, b *BlockGen) {
b.SetCoinbase(common.Address{1})
})
diskdb := rawdb.NewMemoryDatabase()
@ -1982,10 +2018,13 @@ func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error
// Generate fork chain, make it shorter than canon, with common ancestor pretty early
parentIndex := 3
parent := longChain[parentIndex]
heavyChain, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 75, func(i int, b *BlockGen) {
heavyChainExt, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 75, func(i int, b *BlockGen) {
b.SetCoinbase(common.Address{2})
b.OffsetTime(-9)
})
heavyChain = append(heavyChain, longChain[:parentIndex+1]...)
heavyChain = append(heavyChain, heavyChainExt...)
// Verify that the test is sane
var (
longerTd = new(big.Int)

View File

@ -31,6 +31,8 @@ var (
// ErrNoGenesis is returned when there is no Genesis Block.
ErrNoGenesis = errors.New("genesis not found in chain")
errSideChainReceipts = errors.New("side blocks can't be accepted as ancient chain data")
)
// List of evm-call-message pre-checking errors. All state transition messages will

View File

@ -19,6 +19,7 @@ package rawdb
import (
"bytes"
"encoding/binary"
"fmt"
"math/big"
"sort"
@ -81,6 +82,37 @@ func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash {
return hashes
}
type NumberHash struct {
Number uint64
Hash common.Hash
}
// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights,
// both canonical and reorged forks included.
// This method considers both limits to be _inclusive_.
func ReadAllHashesInRange(db ethdb.Iteratee, first, last uint64) []*NumberHash {
var (
start = encodeBlockNumber(first)
keyLength = len(headerPrefix) + 8 + 32
hashes = make([]*NumberHash, 0, 1+last-first)
it = db.NewIterator(headerPrefix, start)
)
defer it.Release()
for it.Next() {
key := it.Key()
if len(key) != keyLength {
continue
}
num := binary.BigEndian.Uint64(key[len(headerPrefix) : len(headerPrefix)+8])
if num > last {
break
}
hash := common.BytesToHash(key[len(key)-32:])
hashes = append(hashes, &NumberHash{num, hash})
}
return hashes
}
// ReadAllCanonicalHashes retrieves all canonical number and hash mappings at the
// certain chain range. If the accumulated entries reaches the given threshold,
// abort the iteration and return the semi-finish result.
@ -656,34 +688,48 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {
}
// WriteAncientBlock writes entire block data into ancient store and returns the total written size.
func WriteAncientBlock(db ethdb.AncientWriter, block *types.Block, receipts types.Receipts, td *big.Int) int {
// Encode all block components to RLP format.
headerBlob, err := rlp.EncodeToBytes(block.Header())
if err != nil {
log.Crit("Failed to RLP encode block header", "err", err)
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
var (
tdSum = new(big.Int).Set(td)
stReceipts []*types.ReceiptForStorage
)
return db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i, block := range blocks {
// Convert receipts to storage format and sum up total difficulty.
stReceipts = stReceipts[:0]
for _, receipt := range receipts[i] {
stReceipts = append(stReceipts, (*types.ReceiptForStorage)(receipt))
}
bodyBlob, err := rlp.EncodeToBytes(block.Body())
if err != nil {
log.Crit("Failed to RLP encode body", "err", err)
header := block.Header()
if i > 0 {
tdSum.Add(tdSum, header.Difficulty)
}
storageReceipts := make([]*types.ReceiptForStorage, len(receipts))
for i, receipt := range receipts {
storageReceipts[i] = (*types.ReceiptForStorage)(receipt)
if err := writeAncientBlock(op, block, header, stReceipts, tdSum); err != nil {
return err
}
receiptBlob, err := rlp.EncodeToBytes(storageReceipts)
if err != nil {
log.Crit("Failed to RLP encode block receipts", "err", err)
}
tdBlob, err := rlp.EncodeToBytes(td)
if err != nil {
log.Crit("Failed to RLP encode block total difficulty", "err", err)
return nil
})
}
func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int) error {
num := block.NumberU64()
if err := op.AppendRaw(freezerHashTable, num, block.Hash().Bytes()); err != nil {
return fmt.Errorf("can't add block %d hash: %v", num, err)
}
// Write all blob to flatten files.
err = db.AppendAncient(block.NumberU64(), block.Hash().Bytes(), headerBlob, bodyBlob, receiptBlob, tdBlob)
if err != nil {
log.Crit("Failed to write block data to ancient store", "err", err)
if err := op.Append(freezerHeaderTable, num, header); err != nil {
return fmt.Errorf("can't append block header %d: %v", num, err)
}
return len(headerBlob) + len(bodyBlob) + len(receiptBlob) + len(tdBlob) + common.HashLength
if err := op.Append(freezerBodiesTable, num, block.Body()); err != nil {
return fmt.Errorf("can't append block body %d: %v", num, err)
}
if err := op.Append(freezerReceiptTable, num, receipts); err != nil {
return fmt.Errorf("can't append block %d receipts: %v", num, err)
}
if err := op.Append(freezerDifficultyTable, num, td); err != nil {
return fmt.Errorf("can't append block %d total difficulty: %v", num, err)
}
return nil
}
// DeleteBlock removes all block data associated with a hash.

View File

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"golang.org/x/crypto/sha3"
@ -438,7 +439,7 @@ func TestAncientStorage(t *testing.T) {
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
defer os.RemoveAll(frdir)
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false)
if err != nil {
@ -467,8 +468,10 @@ func TestAncientStorage(t *testing.T) {
if blob := ReadTdRLP(db, hash, number); len(blob) > 0 {
t.Fatalf("non existent td returned")
}
// Write and verify the header in the database
WriteAncientBlock(db, block, nil, big.NewInt(100))
WriteAncientBlocks(db, []*types.Block{block}, []types.Receipts{nil}, big.NewInt(100))
if blob := ReadHeaderRLP(db, hash, number); len(blob) == 0 {
t.Fatalf("no header returned")
}
@ -481,6 +484,7 @@ func TestAncientStorage(t *testing.T) {
if blob := ReadTdRLP(db, hash, number); len(blob) == 0 {
t.Fatalf("no td returned")
}
// Use a fake hash for data retrieval, nothing should be returned.
fakeHash := common.BytesToHash([]byte{0x01, 0x02, 0x03})
if blob := ReadHeaderRLP(db, fakeHash, number); len(blob) != 0 {
@ -528,3 +532,141 @@ func TestCanonicalHashIteration(t *testing.T) {
}
}
}
func TestHashesInRange(t *testing.T) {
mkHeader := func(number, seq int) *types.Header {
h := types.Header{
Difficulty: new(big.Int),
Number: big.NewInt(int64(number)),
GasLimit: uint64(seq),
}
return &h
}
db := NewMemoryDatabase()
// For each number, write N versions of that particular number
total := 0
for i := 0; i < 15; i++ {
for ii := 0; ii < i; ii++ {
WriteHeader(db, mkHeader(i, ii))
total++
}
}
if have, want := len(ReadAllHashesInRange(db, 10, 10)), 10; have != want {
t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
}
if have, want := len(ReadAllHashesInRange(db, 10, 9)), 0; have != want {
t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
}
if have, want := len(ReadAllHashesInRange(db, 0, 100)), total; have != want {
t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
}
if have, want := len(ReadAllHashesInRange(db, 9, 10)), 9+10; have != want {
t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
}
if have, want := len(ReadAllHashes(db, 10)), 10; have != want {
t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
}
if have, want := len(ReadAllHashes(db, 16)), 0; have != want {
t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
}
if have, want := len(ReadAllHashes(db, 1)), 1; have != want {
t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have)
}
}
// This measures the write speed of the WriteAncientBlocks operation.
func BenchmarkWriteAncientBlocks(b *testing.B) {
// Open freezer database.
frdir, err := ioutil.TempDir("", "")
if err != nil {
b.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.RemoveAll(frdir)
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false)
if err != nil {
b.Fatalf("failed to create database with ancient backend")
}
// Create the data to insert. The blocks must have consecutive numbers, so we create
// all of them ahead of time. However, there is no need to create receipts
// individually for each block, just make one batch here and reuse it for all writes.
const batchSize = 128
const blockTxs = 20
allBlocks := makeTestBlocks(b.N, blockTxs)
batchReceipts := makeTestReceipts(batchSize, blockTxs)
b.ResetTimer()
// The benchmark loop writes batches of blocks, but note that the total block count is
// b.N. This means the resulting ns/op measurement is the time it takes to write a
// single block and its associated data.
var td = big.NewInt(55)
var totalSize int64
for i := 0; i < b.N; i += batchSize {
length := batchSize
if i+batchSize > b.N {
length = b.N - i
}
blocks := allBlocks[i : i+length]
receipts := batchReceipts[:length]
writeSize, err := WriteAncientBlocks(db, blocks, receipts, td)
if err != nil {
b.Fatal(err)
}
totalSize += writeSize
}
// Enable MB/s reporting.
b.SetBytes(totalSize / int64(b.N))
}
// makeTestBlocks creates fake blocks for the ancient write benchmark.
func makeTestBlocks(nblock int, txsPerBlock int) []*types.Block {
key, _ := crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
signer := types.LatestSignerForChainID(big.NewInt(8))
// Create transactions.
txs := make([]*types.Transaction, txsPerBlock)
for i := 0; i < len(txs); i++ {
var err error
to := common.Address{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}
txs[i], err = types.SignNewTx(key, signer, &types.LegacyTx{
Nonce: 2,
GasPrice: big.NewInt(30000),
Gas: 0x45454545,
To: &to,
})
if err != nil {
panic(err)
}
}
// Create the blocks.
blocks := make([]*types.Block, nblock)
for i := 0; i < nblock; i++ {
header := &types.Header{
Number: big.NewInt(int64(i)),
Extra: []byte("test block"),
}
blocks[i] = types.NewBlockWithHeader(header).WithBody(txs, nil)
blocks[i].Hash() // pre-cache the block hash
}
return blocks
}
// makeTestReceipts creates fake receipts for the ancient write benchmark.
func makeTestReceipts(n int, nPerBlock int) []types.Receipts {
receipts := make([]*types.Receipt, nPerBlock)
for i := 0; i < len(receipts); i++ {
receipts[i] = &types.Receipt{
Status: types.ReceiptStatusSuccessful,
CumulativeGasUsed: 0x888888888,
Logs: make([]*types.Log, 5),
}
}
allReceipts := make([]types.Receipts, n)
for i := 0; i < n; i++ {
allReceipts[i] = receipts
}
return allReceipts
}

View File

@ -104,9 +104,9 @@ func (db *nofreezedb) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
}
// AppendAncient returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error {
return errNotSupported
// ModifyAncients is not supported.
func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, error) {
return 0, errNotSupported
}
// TruncateAncients returns an error as we don't have a backing chain freezer.
@ -122,9 +122,7 @@ func (db *nofreezedb) Sync() error {
// NewDatabase creates a high level database on top of a given key-value data
// store without a freezer moving immutable chain segments into cold storage.
func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
return &nofreezedb{
KeyValueStore: db,
}
return &nofreezedb{KeyValueStore: db}
}
// NewDatabaseWithFreezer creates a high level database on top of a given key-
@ -132,7 +130,7 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
// storage.
func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string, readonly bool) (ethdb.Database, error) {
// Create the idle freezer instance
frdb, err := newFreezer(freezer, namespace, readonly)
frdb, err := newFreezer(freezer, namespace, readonly, freezerTableSize, FreezerNoSnappy)
if err != nil {
return nil, err
}

View File

@ -61,6 +61,9 @@ const (
// freezerBatchLimit is the maximum number of blocks to freeze in one batch
// before doing an fsync and deleting it from the key-value store.
freezerBatchLimit = 30000
// freezerTableSize defines the maximum size of freezer data files.
freezerTableSize = 2 * 1000 * 1000 * 1000
)
// freezer is an memory mapped append-only database to store immutable chain data
@ -77,6 +80,10 @@ type freezer struct {
frozen uint64 // Number of blocks already frozen
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
// This lock synchronizes writers and the truncate operation.
writeLock sync.Mutex
writeBatch *freezerBatch
readonly bool
tables map[string]*freezerTable // Data tables for storing everything
instanceLock fileutil.Releaser // File-system lock to prevent double opens
@ -90,7 +97,10 @@ type freezer struct {
// newFreezer creates a chain freezer that moves ancient chain data into
// append-only flat file containers.
func newFreezer(datadir string, namespace string, readonly bool) (*freezer, error) {
//
// The 'tables' argument defines the data tables. If the value of a map
// entry is true, snappy compression is disabled for the table.
func newFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*freezer, error) {
// Create the initial freezer object
var (
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
@ -119,8 +129,10 @@ func newFreezer(datadir string, namespace string, readonly bool) (*freezer, erro
trigger: make(chan chan struct{}),
quit: make(chan struct{}),
}
for name, disableSnappy := range FreezerNoSnappy {
table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, disableSnappy)
// Create the tables.
for name, disableSnappy := range tables {
table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, maxTableSize, disableSnappy)
if err != nil {
for _, table := range freezer.tables {
table.Close()
@ -130,6 +142,8 @@ func newFreezer(datadir string, namespace string, readonly bool) (*freezer, erro
}
freezer.tables[name] = table
}
// Truncate all tables to common length.
if err := freezer.repair(); err != nil {
for _, table := range freezer.tables {
table.Close()
@ -137,12 +151,19 @@ func newFreezer(datadir string, namespace string, readonly bool) (*freezer, erro
lock.Release()
return nil, err
}
// Create the write batch.
freezer.writeBatch = newFreezerBatch(freezer)
log.Info("Opened ancient database", "database", datadir, "readonly", readonly)
return freezer, nil
}
// Close terminates the chain freezer, unmapping all the data files.
func (f *freezer) Close() error {
f.writeLock.Lock()
defer f.writeLock.Unlock()
var errs []error
f.closeOnce.Do(func() {
close(f.quit)
@ -199,60 +220,49 @@ func (f *freezer) Ancients() (uint64, error) {
// AncientSize returns the ancient size of the specified category.
func (f *freezer) AncientSize(kind string) (uint64, error) {
// This needs the write lock to avoid data races on table fields.
// Speed doesn't matter here, AncientSize is for debugging.
f.writeLock.Lock()
defer f.writeLock.Unlock()
if table := f.tables[kind]; table != nil {
return table.size()
}
return 0, errUnknownTable
}
// AppendAncient injects all binary blobs belong to block at the end of the
// append-only immutable table files.
//
// Notably, this function is lock free but kind of thread-safe. All out-of-order
// injection will be rejected. But if two injections with same number happen at
// the same time, we can get into the trouble.
func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td []byte) (err error) {
// ModifyAncients runs the given write operation.
func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) {
if f.readonly {
return errReadOnly
return 0, errReadOnly
}
// Ensure the binary blobs we are appending is continuous with freezer.
if atomic.LoadUint64(&f.frozen) != number {
return errOutOrderInsertion
}
// Rollback all inserted data if any insertion below failed to ensure
// the tables won't out of sync.
f.writeLock.Lock()
defer f.writeLock.Unlock()
// Roll back all tables to the starting position in case of error.
prevItem := f.frozen
defer func() {
if err != nil {
rerr := f.repair()
if rerr != nil {
log.Crit("Failed to repair freezer", "err", rerr)
// The write operation has failed. Go back to the previous item position.
for name, table := range f.tables {
err := table.truncate(prevItem)
if err != nil {
log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err)
}
}
log.Info("Append ancient failed", "number", number, "err", err)
}
}()
// Inject all the components into the relevant data tables
if err := f.tables[freezerHashTable].Append(f.frozen, hash[:]); err != nil {
log.Error("Failed to append ancient hash", "number", f.frozen, "hash", hash, "err", err)
return err
f.writeBatch.reset()
if err := fn(f.writeBatch); err != nil {
return 0, err
}
if err := f.tables[freezerHeaderTable].Append(f.frozen, header); err != nil {
log.Error("Failed to append ancient header", "number", f.frozen, "hash", hash, "err", err)
return err
item, writeSize, err := f.writeBatch.commit()
if err != nil {
return 0, err
}
if err := f.tables[freezerBodiesTable].Append(f.frozen, body); err != nil {
log.Error("Failed to append ancient body", "number", f.frozen, "hash", hash, "err", err)
return err
}
if err := f.tables[freezerReceiptTable].Append(f.frozen, receipts); err != nil {
log.Error("Failed to append ancient receipts", "number", f.frozen, "hash", hash, "err", err)
return err
}
if err := f.tables[freezerDifficultyTable].Append(f.frozen, td); err != nil {
log.Error("Failed to append ancient difficulty", "number", f.frozen, "hash", hash, "err", err)
return err
}
atomic.AddUint64(&f.frozen, 1) // Only modify atomically
return nil
atomic.StoreUint64(&f.frozen, item)
return writeSize, nil
}
// TruncateAncients discards any recent data above the provided threshold number.
@ -260,6 +270,9 @@ func (f *freezer) TruncateAncients(items uint64) error {
if f.readonly {
return errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()
if atomic.LoadUint64(&f.frozen) <= items {
return nil
}
@ -286,6 +299,24 @@ func (f *freezer) Sync() error {
return nil
}
// repair truncates all data tables to the same length.
func (f *freezer) repair() error {
min := uint64(math.MaxUint64)
for _, table := range f.tables {
items := atomic.LoadUint64(&table.items)
if min > items {
min = items
}
}
for _, table := range f.tables {
if err := table.truncate(min); err != nil {
return err
}
}
atomic.StoreUint64(&f.frozen, min)
return nil
}
// freeze is a background thread that periodically checks the blockchain for any
// import progress and moves ancient data from the fast database into the freezer.
//
@ -352,54 +383,28 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
backoff = true
continue
}
// Seems we have data ready to be frozen, process in usable batches
limit := *number - threshold
if limit-f.frozen > freezerBatchLimit {
limit = f.frozen + freezerBatchLimit
}
var (
start = time.Now()
first = f.frozen
ancients = make([]common.Hash, 0, limit-f.frozen)
first, _ = f.Ancients()
limit = *number - threshold
)
for f.frozen <= limit {
// Retrieves all the components of the canonical block
hash := ReadCanonicalHash(nfdb, f.frozen)
if hash == (common.Hash{}) {
log.Error("Canonical hash missing, can't freeze", "number", f.frozen)
break
if limit-first > freezerBatchLimit {
limit = first + freezerBatchLimit
}
header := ReadHeaderRLP(nfdb, hash, f.frozen)
if len(header) == 0 {
log.Error("Block header missing, can't freeze", "number", f.frozen, "hash", hash)
break
}
body := ReadBodyRLP(nfdb, hash, f.frozen)
if len(body) == 0 {
log.Error("Block body missing, can't freeze", "number", f.frozen, "hash", hash)
break
}
receipts := ReadReceiptsRLP(nfdb, hash, f.frozen)
if len(receipts) == 0 {
log.Error("Block receipts missing, can't freeze", "number", f.frozen, "hash", hash)
break
}
td := ReadTdRLP(nfdb, hash, f.frozen)
if len(td) == 0 {
log.Error("Total difficulty missing, can't freeze", "number", f.frozen, "hash", hash)
break
}
log.Trace("Deep froze ancient block", "number", f.frozen, "hash", hash)
// Inject all the components into the relevant data tables
if err := f.AppendAncient(f.frozen, hash[:], header, body, receipts, td); err != nil {
break
}
ancients = append(ancients, hash)
ancients, err := f.freezeRange(nfdb, first, limit)
if err != nil {
log.Error("Error in block freeze operation", "err", err)
backoff = true
continue
}
// Batch of blocks have been frozen, flush them before wiping from leveldb
if err := f.Sync(); err != nil {
log.Crit("Failed to flush frozen tables", "err", err)
}
// Wipe out all data from the active database
batch := db.NewBatch()
for i := 0; i < len(ancients); i++ {
@ -464,6 +469,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
log.Crit("Failed to delete dangling side blocks", "err", err)
}
}
// Log something friendly for the user
context := []interface{}{
"blocks", f.frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", f.frozen - 1,
@ -480,20 +486,54 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
}
}
// repair truncates all data tables to the same length.
func (f *freezer) repair() error {
min := uint64(math.MaxUint64)
for _, table := range f.tables {
items := atomic.LoadUint64(&table.items)
if min > items {
min = items
func (f *freezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) {
hashes = make([]common.Hash, 0, limit-number)
_, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for ; number <= limit; number++ {
// Retrieve all the components of the canonical block.
hash := ReadCanonicalHash(nfdb, number)
if hash == (common.Hash{}) {
return fmt.Errorf("canonical hash missing, can't freeze block %d", number)
}
header := ReadHeaderRLP(nfdb, hash, number)
if len(header) == 0 {
return fmt.Errorf("block header missing, can't freeze block %d", number)
}
for _, table := range f.tables {
if err := table.truncate(min); err != nil {
return err
body := ReadBodyRLP(nfdb, hash, number)
if len(body) == 0 {
return fmt.Errorf("block body missing, can't freeze block %d", number)
}
receipts := ReadReceiptsRLP(nfdb, hash, number)
if len(receipts) == 0 {
return fmt.Errorf("block receipts missing, can't freeze block %d", number)
}
td := ReadTdRLP(nfdb, hash, number)
if len(td) == 0 {
return fmt.Errorf("total difficulty missing, can't freeze block %d", number)
}
// Write to the batch.
if err := op.AppendRaw(freezerHashTable, number, hash[:]); err != nil {
return fmt.Errorf("can't write hash to freezer: %v", err)
}
if err := op.AppendRaw(freezerHeaderTable, number, header); err != nil {
return fmt.Errorf("can't write header to freezer: %v", err)
}
if err := op.AppendRaw(freezerBodiesTable, number, body); err != nil {
return fmt.Errorf("can't write body to freezer: %v", err)
}
if err := op.AppendRaw(freezerReceiptTable, number, receipts); err != nil {
return fmt.Errorf("can't write receipts to freezer: %v", err)
}
if err := op.AppendRaw(freezerDifficultyTable, number, td); err != nil {
return fmt.Errorf("can't write td to freezer: %v", err)
}
hashes = append(hashes, hash)
}
atomic.StoreUint64(&f.frozen, min)
return nil
})
return hashes, err
}

248
core/rawdb/freezer_batch.go Normal file
View File

@ -0,0 +1,248 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"fmt"
"sync/atomic"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/rlp"
"github.com/golang/snappy"
)
// This is the maximum amount of data that will be buffered in memory
// for a single freezer table batch.
const freezerBatchBufferLimit = 2 * 1024 * 1024
// freezerBatch is a write operation of multiple items on a freezer.
type freezerBatch struct {
tables map[string]*freezerTableBatch
}
func newFreezerBatch(f *freezer) *freezerBatch {
batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))}
for kind, table := range f.tables {
batch.tables[kind] = table.newBatch()
}
return batch
}
// Append adds an RLP-encoded item of the given kind.
func (batch *freezerBatch) Append(kind string, num uint64, item interface{}) error {
return batch.tables[kind].Append(num, item)
}
// AppendRaw adds an item of the given kind.
func (batch *freezerBatch) AppendRaw(kind string, num uint64, item []byte) error {
return batch.tables[kind].AppendRaw(num, item)
}
// reset initializes the batch.
func (batch *freezerBatch) reset() {
for _, tb := range batch.tables {
tb.reset()
}
}
// commit is called at the end of a write operation and
// writes all remaining data to tables.
func (batch *freezerBatch) commit() (item uint64, writeSize int64, err error) {
// Check that count agrees on all batches.
item = uint64(math.MaxUint64)
for name, tb := range batch.tables {
if item < math.MaxUint64 && tb.curItem != item {
return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, tb.curItem, item)
}
item = tb.curItem
}
// Commit all table batches.
for _, tb := range batch.tables {
if err := tb.commit(); err != nil {
return 0, 0, err
}
writeSize += tb.totalBytes
}
return item, writeSize, nil
}
// freezerTableBatch is a batch for a freezer table.
type freezerTableBatch struct {
t *freezerTable
sb *snappyBuffer
encBuffer writeBuffer
dataBuffer []byte
indexBuffer []byte
curItem uint64 // expected index of next append
totalBytes int64 // counts written bytes since reset
}
// newBatch creates a new batch for the freezer table.
func (t *freezerTable) newBatch() *freezerTableBatch {
batch := &freezerTableBatch{t: t}
if !t.noCompression {
batch.sb = new(snappyBuffer)
}
batch.reset()
return batch
}
// reset clears the batch for reuse.
func (batch *freezerTableBatch) reset() {
batch.dataBuffer = batch.dataBuffer[:0]
batch.indexBuffer = batch.indexBuffer[:0]
batch.curItem = atomic.LoadUint64(&batch.t.items)
batch.totalBytes = 0
}
// Append rlp-encodes and adds data at the end of the freezer table. The item number is a
// precautionary parameter to ensure data correctness, but the table will reject already
// existing data.
func (batch *freezerTableBatch) Append(item uint64, data interface{}) error {
if item != batch.curItem {
return errOutOrderInsertion
}
// Encode the item.
batch.encBuffer.Reset()
if err := rlp.Encode(&batch.encBuffer, data); err != nil {
return err
}
encItem := batch.encBuffer.data
if batch.sb != nil {
encItem = batch.sb.compress(encItem)
}
return batch.appendItem(encItem)
}
// AppendRaw injects a binary blob at the end of the freezer table. The item number is a
// precautionary parameter to ensure data correctness, but the table will reject already
// existing data.
func (batch *freezerTableBatch) AppendRaw(item uint64, blob []byte) error {
if item != batch.curItem {
return errOutOrderInsertion
}
encItem := blob
if batch.sb != nil {
encItem = batch.sb.compress(blob)
}
return batch.appendItem(encItem)
}
func (batch *freezerTableBatch) appendItem(data []byte) error {
// Check if item fits into current data file.
itemSize := int64(len(data))
itemOffset := batch.t.headBytes + int64(len(batch.dataBuffer))
if itemOffset+itemSize > int64(batch.t.maxFileSize) {
// It doesn't fit, go to next file first.
if err := batch.commit(); err != nil {
return err
}
if err := batch.t.advanceHead(); err != nil {
return err
}
itemOffset = 0
}
// Put data to buffer.
batch.dataBuffer = append(batch.dataBuffer, data...)
batch.totalBytes += itemSize
// Put index entry to buffer.
entry := indexEntry{filenum: batch.t.headId, offset: uint32(itemOffset + itemSize)}
batch.indexBuffer = entry.append(batch.indexBuffer)
batch.curItem++
return batch.maybeCommit()
}
// maybeCommit writes the buffered data if the buffer is full enough.
func (batch *freezerTableBatch) maybeCommit() error {
if len(batch.dataBuffer) > freezerBatchBufferLimit {
return batch.commit()
}
return nil
}
// commit writes the batched items to the backing freezerTable.
func (batch *freezerTableBatch) commit() error {
// Write data.
_, err := batch.t.head.Write(batch.dataBuffer)
if err != nil {
return err
}
dataSize := int64(len(batch.dataBuffer))
batch.dataBuffer = batch.dataBuffer[:0]
// Write index.
_, err = batch.t.index.Write(batch.indexBuffer)
if err != nil {
return err
}
indexSize := int64(len(batch.indexBuffer))
batch.indexBuffer = batch.indexBuffer[:0]
// Update headBytes of table.
batch.t.headBytes += dataSize
atomic.StoreUint64(&batch.t.items, batch.curItem)
// Update metrics.
batch.t.sizeGauge.Inc(dataSize + indexSize)
batch.t.writeMeter.Mark(dataSize + indexSize)
return nil
}
// snappyBuffer writes snappy in block format, and can be reused. It is
// reset when WriteTo is called.
type snappyBuffer struct {
dst []byte
}
// compress snappy-compresses the data.
func (s *snappyBuffer) compress(data []byte) []byte {
// The snappy library does not care what the capacity of the buffer is,
// but only checks the length. If the length is too small, it will
// allocate a brand new buffer.
// To avoid that, we check the required size here, and grow the size of the
// buffer to utilize the full capacity.
if n := snappy.MaxEncodedLen(len(data)); len(s.dst) < n {
if cap(s.dst) < n {
s.dst = make([]byte, n)
}
s.dst = s.dst[:n]
}
s.dst = snappy.Encode(s.dst, data)
return s.dst
}
// writeBuffer implements io.Writer for a byte slice.
type writeBuffer struct {
data []byte
}
func (wb *writeBuffer) Write(data []byte) (int, error) {
wb.data = append(wb.data, data...)
return len(data), nil
}
func (wb *writeBuffer) Reset() {
wb.data = wb.data[:0]
}

View File

@ -17,6 +17,7 @@
package rawdb
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
@ -55,19 +56,20 @@ type indexEntry struct {
const indexEntrySize = 6
// unmarshallBinary deserializes binary b into the rawIndex entry.
// unmarshalBinary deserializes binary b into the rawIndex entry.
func (i *indexEntry) unmarshalBinary(b []byte) error {
i.filenum = uint32(binary.BigEndian.Uint16(b[:2]))
i.offset = binary.BigEndian.Uint32(b[2:6])
return nil
}
// marshallBinary serializes the rawIndex entry into binary.
func (i *indexEntry) marshallBinary() []byte {
b := make([]byte, indexEntrySize)
binary.BigEndian.PutUint16(b[:2], uint16(i.filenum))
binary.BigEndian.PutUint32(b[2:6], i.offset)
return b
// append adds the encoded entry to the end of b.
func (i *indexEntry) append(b []byte) []byte {
offset := len(b)
out := append(b, make([]byte, indexEntrySize)...)
binary.BigEndian.PutUint16(out[offset:], uint16(i.filenum))
binary.BigEndian.PutUint32(out[offset+2:], i.offset)
return out
}
// bounds returns the start- and end- offsets, and the file number of where to
@ -107,7 +109,7 @@ type freezerTable struct {
// to count how many historic items have gone missing.
itemOffset uint32 // Offset (number of discarded items)
headBytes uint32 // Number of bytes written to the head file
headBytes int64 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables
@ -118,12 +120,7 @@ type freezerTable struct {
// NewFreezerTable opens the given path as a freezer table.
func NewFreezerTable(path, name string, disableSnappy bool) (*freezerTable, error) {
return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, disableSnappy)
}
// newTable opens a freezer table with default settings - 2G files
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, disableSnappy bool) (*freezerTable, error) {
return newCustomTable(path, name, readMeter, writeMeter, sizeGauge, 2*1000*1000*1000, disableSnappy)
return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy)
}
// openFreezerFileForAppend opens a freezer table file and seeks to the end
@ -164,10 +161,10 @@ func truncateFreezerFile(file *os.File, size int64) error {
return nil
}
// newCustomTable opens a freezer table, creating the data and index files if they are
// newTable opens a freezer table, creating the data and index files if they are
// non existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync.
func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) {
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) {
// Ensure the containing directory exists and open the indexEntry file
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
@ -313,7 +310,7 @@ func (t *freezerTable) repair() error {
}
// Update the item and byte counters and return
t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file
t.headBytes = uint32(contentSize)
t.headBytes = contentSize
t.headId = lastIndex.filenum
// Close opened files and preopen all files
@ -387,14 +384,14 @@ func (t *freezerTable) truncate(items uint64) error {
t.releaseFilesAfter(expected.filenum, true)
// Set back the historic head
t.head = newHead
atomic.StoreUint32(&t.headId, expected.filenum)
t.headId = expected.filenum
}
if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil {
return err
}
// All data files truncated, set internal counters and return
t.headBytes = int64(expected.offset)
atomic.StoreUint64(&t.items, items)
atomic.StoreUint32(&t.headBytes, expected.offset)
// Retrieve the new size and update the total size counter
newSize, err := t.sizeNolock()
@ -471,94 +468,6 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
}
}
// Append injects a binary blob at the end of the freezer table. The item number
// is a precautionary parameter to ensure data correctness, but the table will
// reject already existing data.
//
// Note, this method will *not* flush any data to disk so be sure to explicitly
// fsync before irreversibly deleting data from the database.
func (t *freezerTable) Append(item uint64, blob []byte) error {
// Encode the blob before the lock portion
if !t.noCompression {
blob = snappy.Encode(nil, blob)
}
// Read lock prevents competition with truncate
retry, err := t.append(item, blob, false)
if err != nil {
return err
}
if retry {
// Read lock was insufficient, retry with a writelock
_, err = t.append(item, blob, true)
}
return err
}
// append injects a binary blob at the end of the freezer table.
// Normally, inserts do not require holding the write-lock, so it should be invoked with 'wlock' set to
// false.
// However, if the data will grown the current file out of bounds, then this
// method will return 'true, nil', indicating that the caller should retry, this time
// with 'wlock' set to true.
func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool, error) {
if wlock {
t.lock.Lock()
defer t.lock.Unlock()
} else {
t.lock.RLock()
defer t.lock.RUnlock()
}
// Ensure the table is still accessible
if t.index == nil || t.head == nil {
return false, errClosed
}
// Ensure only the next item can be written, nothing else
if atomic.LoadUint64(&t.items) != item {
return false, fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item)
}
bLen := uint32(len(encodedBlob))
if t.headBytes+bLen < bLen ||
t.headBytes+bLen > t.maxFileSize {
// Writing would overflow, so we need to open a new data file.
// If we don't already hold the writelock, abort and let the caller
// invoke this method a second time.
if !wlock {
return true, nil
}
nextID := atomic.LoadUint32(&t.headId) + 1
// We open the next file in truncated mode -- if this file already
// exists, we need to start over from scratch on it
newHead, err := t.openFile(nextID, openFreezerFileTruncated)
if err != nil {
return false, err
}
// Close old file, and reopen in RDONLY mode
t.releaseFile(t.headId)
t.openFile(t.headId, openFreezerFileForReadOnly)
// Swap out the current head
t.head = newHead
atomic.StoreUint32(&t.headBytes, 0)
atomic.StoreUint32(&t.headId, nextID)
}
if _, err := t.head.Write(encodedBlob); err != nil {
return false, err
}
newOffset := atomic.AddUint32(&t.headBytes, bLen)
idx := indexEntry{
filenum: atomic.LoadUint32(&t.headId),
offset: newOffset,
}
// Write indexEntry
t.index.Write(idx.marshallBinary())
t.writeMeter.Mark(int64(bLen + indexEntrySize))
t.sizeGauge.Inc(int64(bLen + indexEntrySize))
atomic.AddUint64(&t.items, 1)
return false, nil
}
// getIndices returns the index entries for the given from-item, covering 'count' items.
// N.B: The actual number of returned indices for N items will always be N+1 (unless an
// error is returned).
@ -651,6 +560,7 @@ func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, e
func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) {
t.lock.RLock()
defer t.lock.RUnlock()
// Ensure the table and the item is accessible
if t.index == nil || t.head == nil {
return nil, nil, errClosed
@ -763,6 +673,32 @@ func (t *freezerTable) sizeNolock() (uint64, error) {
return total, nil
}
// advanceHead should be called when the current head file would outgrow the file limits,
// and a new file must be opened. The caller of this method must hold the write-lock
// before calling this method.
func (t *freezerTable) advanceHead() error {
t.lock.Lock()
defer t.lock.Unlock()
// We open the next file in truncated mode -- if this file already
// exists, we need to start over from scratch on it.
nextID := t.headId + 1
newHead, err := t.openFile(nextID, openFreezerFileTruncated)
if err != nil {
return err
}
// Close old file, and reopen in RDONLY mode.
t.releaseFile(t.headId)
t.openFile(t.headId, openFreezerFileForReadOnly)
// Swap out the current head.
t.head = newHead
t.headBytes = 0
t.headId = nextID
return nil
}
// Sync pushes any pending data from memory out to disk. This is an expensive
// operation, so use it with care.
func (t *freezerTable) Sync() error {
@ -775,10 +711,21 @@ func (t *freezerTable) Sync() error {
// DumpIndex is a debug print utility function, mainly for testing. It can also
// be used to analyse a live freezer table index.
func (t *freezerTable) DumpIndex(start, stop int64) {
t.dumpIndex(os.Stdout, start, stop)
}
func (t *freezerTable) dumpIndexString(start, stop int64) string {
var out bytes.Buffer
out.WriteString("\n")
t.dumpIndex(&out, start, stop)
return out.String()
}
func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) {
buf := make([]byte, indexEntrySize)
fmt.Printf("| number | fileno | offset |\n")
fmt.Printf("|--------|--------|--------|\n")
fmt.Fprintf(w, "| number | fileno | offset |\n")
fmt.Fprintf(w, "|--------|--------|--------|\n")
for i := uint64(start); ; i++ {
if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil {
@ -786,10 +733,10 @@ func (t *freezerTable) DumpIndex(start, stop int64) {
}
var entry indexEntry
entry.unmarshalBinary(buf)
fmt.Printf("| %03d | %03d | %03d | \n", i, entry.filenum, entry.offset)
fmt.Fprintf(w, "| %03d | %03d | %03d | \n", i, entry.filenum, entry.offset)
if stop > 0 && i >= uint64(stop) {
break
}
}
fmt.Printf("|--------------------------|\n")
fmt.Fprintf(w, "|--------------------------|\n")
}

View File

@ -18,49 +18,36 @@ package rawdb
import (
"bytes"
"encoding/binary"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/stretchr/testify/require"
)
func init() {
rand.Seed(time.Now().Unix())
}
// Gets a chunk of data, filled with 'b'
func getChunk(size int, b int) []byte {
data := make([]byte, size)
for i := range data {
data[i] = byte(b)
}
return data
}
// TestFreezerBasics test initializing a freezertable from scratch, writing to the table,
// and reading it back.
func TestFreezerBasics(t *testing.T) {
t.Parallel()
// set cutoff at 50 bytes
f, err := newCustomTable(os.TempDir(),
f, err := newTable(os.TempDir(),
fmt.Sprintf("unittest-%d", rand.Uint64()),
metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true)
if err != nil {
t.Fatal(err)
}
defer f.Close()
// Write 15 bytes 255 times, results in 85 files
for x := 0; x < 255; x++ {
data := getChunk(15, x)
f.Append(uint64(x), data)
}
writeChunks(t, f, 255, 15)
//print(t, f, 0)
//print(t, f, 1)
@ -98,16 +85,21 @@ func TestFreezerBasicsClosing(t *testing.T) {
f *freezerTable
err error
)
f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 255 times, results in 85 files
// Write 15 bytes 255 times, results in 85 files.
// In-between writes, the table is closed and re-opened.
for x := 0; x < 255; x++ {
data := getChunk(15, x)
f.Append(uint64(x), data)
batch := f.newBatch()
require.NoError(t, batch.AppendRaw(uint64(x), data))
require.NoError(t, batch.commit())
f.Close()
f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@ -124,7 +116,7 @@ func TestFreezerBasicsClosing(t *testing.T) {
t.Fatalf("test %d, got \n%x != \n%x", y, got, exp)
}
f.Close()
f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@ -137,22 +129,22 @@ func TestFreezerRepairDanglingHead(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
// Fill table
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 255 times
for x := 0; x < 255; x++ {
data := getChunk(15, x)
f.Append(uint64(x), data)
}
writeChunks(t, f, 255, 15)
// The last item should be there
if _, err = f.Retrieve(0xfe); err != nil {
t.Fatal(err)
}
f.Close()
}
// open the index
idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644)
if err != nil {
@ -165,9 +157,10 @@ func TestFreezerRepairDanglingHead(t *testing.T) {
}
idxFile.Truncate(stat.Size() - 4)
idxFile.Close()
// Now open it again
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@ -188,22 +181,22 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64())
{ // Fill a table and close it
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
// Fill a table and close it
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 255 times
for x := 0; x < 0xff; x++ {
data := getChunk(15, x)
f.Append(uint64(x), data)
}
writeChunks(t, f, 255, 15)
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
t.Fatal(err)
}
f.Close()
}
// open the index
idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644)
if err != nil {
@ -213,9 +206,10 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
// 0-indexEntry, 1-indexEntry, corrupt-indexEntry
idxFile.Truncate(indexEntrySize + indexEntrySize + indexEntrySize/2)
idxFile.Close()
// Now open it again
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@ -228,15 +222,17 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
t.Errorf("Expected error for missing index entry")
}
// We should now be able to store items again, from item = 1
batch := f.newBatch()
for x := 1; x < 0xff; x++ {
data := getChunk(15, ^x)
f.Append(uint64(x), data)
require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x)))
}
require.NoError(t, batch.commit())
f.Close()
}
// And if we open it, we should now be able to read all of them (new values)
{
f, _ := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, _ := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
for y := 1; y < 255; y++ {
exp := getChunk(15, ^y)
got, err := f.Retrieve(uint64(y))
@ -255,22 +251,21 @@ func TestSnappyDetection(t *testing.T) {
t.Parallel()
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("snappytest-%d", rand.Uint64())
// Open with snappy
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 255 times
for x := 0; x < 0xff; x++ {
data := getChunk(15, x)
f.Append(uint64(x), data)
}
writeChunks(t, f, 255, 15)
f.Close()
}
// Open without snappy
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, false)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false)
if err != nil {
t.Fatal(err)
}
@ -282,7 +277,7 @@ func TestSnappyDetection(t *testing.T) {
// Open with snappy
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@ -292,8 +287,8 @@ func TestSnappyDetection(t *testing.T) {
t.Fatalf("expected no error, got %v", err)
}
}
}
func assertFileSize(f string, size int64) error {
stat, err := os.Stat(f)
if err != nil {
@ -303,7 +298,6 @@ func assertFileSize(f string, size int64) error {
return fmt.Errorf("error, expected size %d, got %d", size, stat.Size())
}
return nil
}
// TestFreezerRepairDanglingIndex checks that if the index has more entries than there are data,
@ -313,16 +307,15 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64())
{ // Fill a table and close it
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
// Fill a table and close it
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 9 times : 150 bytes
for x := 0; x < 9; x++ {
data := getChunk(15, x)
f.Append(uint64(x), data)
}
writeChunks(t, f, 9, 15)
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
f.Close()
@ -331,6 +324,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
f.Close()
// File sizes should be 45, 45, 45 : items[3, 3, 3)
}
// Crop third file
fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0002.rdat", fname))
// Truncate third file: 45 ,45, 20
@ -345,17 +339,18 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
file.Truncate(20)
file.Close()
}
// Open db it again
// It should restore the file(s) to
// 45, 45, 15
// with 3+3+1 items
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
defer f.Close()
if f.items != 7 {
f.Close()
t.Fatalf("expected %d items, got %d", 7, f.items)
}
if err := assertFileSize(fileToCrop, 15); err != nil {
@ -365,30 +360,29 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
}
func TestFreezerTruncate(t *testing.T) {
t.Parallel()
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("truncation-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
// Fill table
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 30 times
for x := 0; x < 30; x++ {
data := getChunk(15, x)
f.Append(uint64(x), data)
}
writeChunks(t, f, 30, 15)
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
t.Fatal(err)
}
f.Close()
}
// Reopen, truncate
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@ -401,9 +395,7 @@ func TestFreezerTruncate(t *testing.T) {
if f.headBytes != 15 {
t.Fatalf("expected %d bytes, got %d", 15, f.headBytes)
}
}
}
// TestFreezerRepairFirstFile tests a head file with the very first item only half-written.
@ -412,20 +404,26 @@ func TestFreezerRepairFirstFile(t *testing.T) {
t.Parallel()
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
// Fill table
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 80 bytes, splitting out into two files
f.Append(0, getChunk(40, 0xFF))
f.Append(1, getChunk(40, 0xEE))
batch := f.newBatch()
require.NoError(t, batch.AppendRaw(0, getChunk(40, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xEE)))
require.NoError(t, batch.commit())
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
if _, err = f.Retrieve(1); err != nil {
t.Fatal(err)
}
f.Close()
}
// Truncate the file in half
fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0001.rdat", fname))
{
@ -439,9 +437,10 @@ func TestFreezerRepairFirstFile(t *testing.T) {
file.Truncate(20)
file.Close()
}
// Reopen
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@ -449,9 +448,14 @@ func TestFreezerRepairFirstFile(t *testing.T) {
f.Close()
t.Fatalf("expected %d items, got %d", 0, f.items)
}
// Write 40 bytes
f.Append(1, getChunk(40, 0xDD))
batch := f.newBatch()
require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xDD)))
require.NoError(t, batch.commit())
f.Close()
// Should have been truncated down to zero and then 40 written
if err := assertFileSize(fileToCrop, 40); err != nil {
t.Fatal(err)
@ -468,25 +472,26 @@ func TestFreezerReadAndTruncate(t *testing.T) {
t.Parallel()
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("read_truncate-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
// Fill table
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 30 times
for x := 0; x < 30; x++ {
data := getChunk(15, x)
f.Append(uint64(x), data)
}
writeChunks(t, f, 30, 15)
// The last item should be there
if _, err = f.Retrieve(f.items - 1); err != nil {
t.Fatal(err)
}
f.Close()
}
// Reopen and read all files
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@ -497,40 +502,48 @@ func TestFreezerReadAndTruncate(t *testing.T) {
for y := byte(0); y < 30; y++ {
f.Retrieve(uint64(y))
}
// Now, truncate back to zero
f.truncate(0)
// Write the data again
batch := f.newBatch()
for x := 0; x < 30; x++ {
data := getChunk(15, ^x)
if err := f.Append(uint64(x), data); err != nil {
t.Fatalf("error %v", err)
}
require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x)))
}
require.NoError(t, batch.commit())
f.Close()
}
}
func TestOffset(t *testing.T) {
func TestFreezerOffset(t *testing.T) {
t.Parallel()
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("offset-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true)
// Fill table
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true)
if err != nil {
t.Fatal(err)
}
// Write 6 x 20 bytes, splitting out into three files
f.Append(0, getChunk(20, 0xFF))
f.Append(1, getChunk(20, 0xEE))
batch := f.newBatch()
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
f.Append(2, getChunk(20, 0xdd))
f.Append(3, getChunk(20, 0xcc))
require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc)))
f.Append(4, getChunk(20, 0xbb))
f.Append(5, getChunk(20, 0xaa))
f.DumpIndex(0, 100)
require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
require.NoError(t, batch.commit())
t.Log(f.dumpIndexString(0, 100))
f.Close()
}
// Now crop it.
{
// delete files 0 and 1
@ -558,7 +571,7 @@ func TestOffset(t *testing.T) {
filenum: tailId,
offset: itemOffset,
}
buf := zeroIndex.marshallBinary()
buf := zeroIndex.append(nil)
// Overwrite index zero
copy(indexBuf, buf)
// Remove the four next indices by overwriting
@ -567,44 +580,36 @@ func TestOffset(t *testing.T) {
// Need to truncate the moved index items
indexFile.Truncate(indexEntrySize * (1 + 2))
indexFile.Close()
}
// Now open again
checkPresent := func(numDeleted uint64) {
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true)
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true)
if err != nil {
t.Fatal(err)
}
f.DumpIndex(0, 100)
// It should allow writing item 6
f.Append(numDeleted+2, getChunk(20, 0x99))
defer f.Close()
t.Log(f.dumpIndexString(0, 100))
// It should be fine to fetch 4,5,6
if got, err := f.Retrieve(numDeleted); err != nil {
t.Fatal(err)
} else if exp := getChunk(20, 0xbb); !bytes.Equal(got, exp) {
t.Fatalf("expected %x got %x", exp, got)
}
if got, err := f.Retrieve(numDeleted + 1); err != nil {
t.Fatal(err)
} else if exp := getChunk(20, 0xaa); !bytes.Equal(got, exp) {
t.Fatalf("expected %x got %x", exp, got)
}
if got, err := f.Retrieve(numDeleted + 2); err != nil {
t.Fatal(err)
} else if exp := getChunk(20, 0x99); !bytes.Equal(got, exp) {
t.Fatalf("expected %x got %x", exp, got)
// It should allow writing item 6.
batch := f.newBatch()
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99)))
require.NoError(t, batch.commit())
checkRetrieveError(t, f, map[uint64]error{
0: errOutOfBounds,
1: errOutOfBounds,
2: errOutOfBounds,
3: errOutOfBounds,
})
checkRetrieve(t, f, map[uint64][]byte{
4: getChunk(20, 0xbb),
5: getChunk(20, 0xaa),
6: getChunk(20, 0x99),
})
}
// It should error at 0, 1,2,3
for i := numDeleted - 1; i > numDeleted-10; i-- {
if _, err := f.Retrieve(i); err == nil {
t.Fatal("expected err")
}
}
}
checkPresent(4)
// Now, let's pretend we have deleted 1M items
// Edit the index again, with a much larger initial offset of 1M.
{
// Read the index file
p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname))
@ -624,13 +629,71 @@ func TestOffset(t *testing.T) {
offset: itemOffset,
filenum: tailId,
}
buf := zeroIndex.marshallBinary()
buf := zeroIndex.append(nil)
// Overwrite index zero
copy(indexBuf, buf)
indexFile.WriteAt(indexBuf, 0)
indexFile.Close()
}
checkPresent(1000000)
// Check that existing items have been moved to index 1M.
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true)
if err != nil {
t.Fatal(err)
}
defer f.Close()
t.Log(f.dumpIndexString(0, 100))
checkRetrieveError(t, f, map[uint64]error{
0: errOutOfBounds,
1: errOutOfBounds,
2: errOutOfBounds,
3: errOutOfBounds,
999999: errOutOfBounds,
})
checkRetrieve(t, f, map[uint64][]byte{
1000000: getChunk(20, 0xbb),
1000001: getChunk(20, 0xaa),
})
}
}
func checkRetrieve(t *testing.T, f *freezerTable, items map[uint64][]byte) {
t.Helper()
for item, wantBytes := range items {
value, err := f.Retrieve(item)
if err != nil {
t.Fatalf("can't get expected item %d: %v", item, err)
}
if !bytes.Equal(value, wantBytes) {
t.Fatalf("item %d has wrong value %x (want %x)", item, value, wantBytes)
}
}
}
func checkRetrieveError(t *testing.T, f *freezerTable, items map[uint64]error) {
t.Helper()
for item, wantError := range items {
value, err := f.Retrieve(item)
if err == nil {
t.Fatalf("unexpected value %x for item %d, want error %v", item, value, wantError)
}
if err != wantError {
t.Fatalf("wrong error for item %d: %v", item, err)
}
}
}
// Gets a chunk of data, filled with 'b'
func getChunk(size int, b int) []byte {
data := make([]byte, size)
for i := range data {
data[i] = byte(b)
}
return data
}
// TODO (?)
@ -644,52 +707,17 @@ func TestOffset(t *testing.T) {
// should be handled already, and the case described above can only (?) happen if an
// external process/user deletes files from the filesystem.
// TestAppendTruncateParallel is a test to check if the Append/truncate operations are
// racy.
//
// The reason why it's not a regular fuzzer, within tests/fuzzers, is that it is dependent
// on timing rather than 'clever' input -- there's no determinism.
func TestAppendTruncateParallel(t *testing.T) {
dir, err := ioutil.TempDir("", "freezer")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
func writeChunks(t *testing.T, ft *freezerTable, n int, length int) {
t.Helper()
f, err := newCustomTable(dir, "tmp", metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, 8, true)
if err != nil {
t.Fatal(err)
}
fill := func(mark uint64) []byte {
data := make([]byte, 8)
binary.LittleEndian.PutUint64(data, mark)
return data
}
for i := 0; i < 5000; i++ {
f.truncate(0)
data0 := fill(0)
f.Append(0, data0)
data1 := fill(1)
var wg sync.WaitGroup
wg.Add(2)
go func() {
f.truncate(0)
wg.Done()
}()
go func() {
f.Append(1, data1)
wg.Done()
}()
wg.Wait()
if have, err := f.Retrieve(0); err == nil {
if !bytes.Equal(have, data0) {
t.Fatalf("have %x want %x", have, data0)
batch := ft.newBatch()
for i := 0; i < n; i++ {
if err := batch.AppendRaw(uint64(i), getChunk(length, i)); err != nil {
t.Fatalf("AppendRaw(%d, ...) returned error: %v", i, err)
}
}
if err := batch.commit(); err != nil {
t.Fatalf("Commit returned error: %v", err)
}
}
@ -698,20 +726,17 @@ func TestSequentialRead(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("batchread-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 30 times
for x := 0; x < 30; x++ {
data := getChunk(15, x)
f.Append(uint64(x), data)
}
writeChunks(t, f, 30, 15)
f.DumpIndex(0, 30)
f.Close()
}
{ // Open it, iterate, verify iteration
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
@ -732,7 +757,7 @@ func TestSequentialRead(t *testing.T) {
}
{ // Open it, iterate, verify byte limit. The byte limit is less than item
// size, so each lookup should only return one item
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true)
if err != nil {
t.Fatal(err)
}
@ -761,16 +786,13 @@ func TestSequentialReadByteLimit(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("batchread-2-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true)
if err != nil {
t.Fatal(err)
}
// Write 10 bytes 30 times,
// Splitting it at every 100 bytes (10 items)
for x := 0; x < 30; x++ {
data := getChunk(10, x)
f.Append(uint64(x), data)
}
writeChunks(t, f, 30, 10)
f.Close()
}
for i, tc := range []struct {
@ -786,7 +808,7 @@ func TestSequentialReadByteLimit(t *testing.T) {
{100, 109, 10},
} {
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true)
if err != nil {
t.Fatal(err)
}

301
core/rawdb/freezer_test.go Normal file
View File

@ -0,0 +1,301 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"math/big"
"math/rand"
"os"
"sync"
"testing"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/stretchr/testify/require"
)
var freezerTestTableDef = map[string]bool{"test": true}
func TestFreezerModify(t *testing.T) {
t.Parallel()
// Create test data.
var valuesRaw [][]byte
var valuesRLP []*big.Int
for x := 0; x < 100; x++ {
v := getChunk(256, x)
valuesRaw = append(valuesRaw, v)
iv := big.NewInt(int64(x))
iv = iv.Exp(iv, iv, nil)
valuesRLP = append(valuesRLP, iv)
}
tables := map[string]bool{"raw": true, "rlp": false}
f, dir := newFreezerForTesting(t, tables)
defer os.RemoveAll(dir)
defer f.Close()
// Commit test data.
_, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := range valuesRaw {
if err := op.AppendRaw("raw", uint64(i), valuesRaw[i]); err != nil {
return err
}
if err := op.Append("rlp", uint64(i), valuesRLP[i]); err != nil {
return err
}
}
return nil
})
if err != nil {
t.Fatal("ModifyAncients failed:", err)
}
// Dump indexes.
for _, table := range f.tables {
t.Log(table.name, "index:", table.dumpIndexString(0, int64(len(valuesRaw))))
}
// Read back test data.
checkAncientCount(t, f, "raw", uint64(len(valuesRaw)))
checkAncientCount(t, f, "rlp", uint64(len(valuesRLP)))
for i := range valuesRaw {
v, _ := f.Ancient("raw", uint64(i))
if !bytes.Equal(v, valuesRaw[i]) {
t.Fatalf("wrong raw value at %d: %x", i, v)
}
ivEnc, _ := f.Ancient("rlp", uint64(i))
want, _ := rlp.EncodeToBytes(valuesRLP[i])
if !bytes.Equal(ivEnc, want) {
t.Fatalf("wrong RLP value at %d: %x", i, ivEnc)
}
}
}
// This checks that ModifyAncients rolls back freezer updates
// when the function passed to it returns an error.
func TestFreezerModifyRollback(t *testing.T) {
t.Parallel()
f, dir := newFreezerForTesting(t, freezerTestTableDef)
defer os.RemoveAll(dir)
theError := errors.New("oops")
_, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
// Append three items. This creates two files immediately,
// because the table size limit of the test freezer is 2048.
require.NoError(t, op.AppendRaw("test", 0, make([]byte, 2048)))
require.NoError(t, op.AppendRaw("test", 1, make([]byte, 2048)))
require.NoError(t, op.AppendRaw("test", 2, make([]byte, 2048)))
return theError
})
if err != theError {
t.Errorf("ModifyAncients returned wrong error %q", err)
}
checkAncientCount(t, f, "test", 0)
f.Close()
// Reopen and check that the rolled-back data doesn't reappear.
tables := map[string]bool{"test": true}
f2, err := newFreezer(dir, "", false, 2049, tables)
if err != nil {
t.Fatalf("can't reopen freezer after failed ModifyAncients: %v", err)
}
defer f2.Close()
checkAncientCount(t, f2, "test", 0)
}
// This test runs ModifyAncients and Ancient concurrently with each other.
func TestFreezerConcurrentModifyRetrieve(t *testing.T) {
t.Parallel()
f, dir := newFreezerForTesting(t, freezerTestTableDef)
defer os.RemoveAll(dir)
defer f.Close()
var (
numReaders = 5
writeBatchSize = uint64(50)
written = make(chan uint64, numReaders*6)
wg sync.WaitGroup
)
wg.Add(numReaders + 1)
// Launch the writer. It appends 10000 items in batches.
go func() {
defer wg.Done()
defer close(written)
for item := uint64(0); item < 10000; item += writeBatchSize {
_, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := uint64(0); i < writeBatchSize; i++ {
item := item + i
value := getChunk(32, int(item))
if err := op.AppendRaw("test", item, value); err != nil {
return err
}
}
return nil
})
if err != nil {
panic(err)
}
for i := 0; i < numReaders; i++ {
written <- item + writeBatchSize
}
}
}()
// Launch the readers. They read random items from the freezer up to the
// current frozen item count.
for i := 0; i < numReaders; i++ {
go func() {
defer wg.Done()
for frozen := range written {
for rc := 0; rc < 80; rc++ {
num := uint64(rand.Intn(int(frozen)))
value, err := f.Ancient("test", num)
if err != nil {
panic(fmt.Errorf("error reading %d (frozen %d): %v", num, frozen, err))
}
if !bytes.Equal(value, getChunk(32, int(num))) {
panic(fmt.Errorf("wrong value at %d", num))
}
}
}
}()
}
wg.Wait()
}
// This test runs ModifyAncients and TruncateAncients concurrently with each other.
func TestFreezerConcurrentModifyTruncate(t *testing.T) {
f, dir := newFreezerForTesting(t, freezerTestTableDef)
defer os.RemoveAll(dir)
defer f.Close()
var item = make([]byte, 256)
for i := 0; i < 1000; i++ {
// First reset and write 100 items.
if err := f.TruncateAncients(0); err != nil {
t.Fatal("truncate failed:", err)
}
_, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := uint64(0); i < 100; i++ {
if err := op.AppendRaw("test", i, item); err != nil {
return err
}
}
return nil
})
if err != nil {
t.Fatal("modify failed:", err)
}
checkAncientCount(t, f, "test", 100)
// Now append 100 more items and truncate concurrently.
var (
wg sync.WaitGroup
truncateErr error
modifyErr error
)
wg.Add(3)
go func() {
_, modifyErr = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := uint64(100); i < 200; i++ {
if err := op.AppendRaw("test", i, item); err != nil {
return err
}
}
return nil
})
wg.Done()
}()
go func() {
truncateErr = f.TruncateAncients(10)
wg.Done()
}()
go func() {
f.AncientSize("test")
wg.Done()
}()
wg.Wait()
// Now check the outcome. If the truncate operation went through first, the append
// fails, otherwise it succeeds. In either case, the freezer should be positioned
// at 10 after both operations are done.
if truncateErr != nil {
t.Fatal("concurrent truncate failed:", err)
}
if !(modifyErr == nil || modifyErr == errOutOrderInsertion) {
t.Fatal("wrong error from concurrent modify:", modifyErr)
}
checkAncientCount(t, f, "test", 10)
}
}
func newFreezerForTesting(t *testing.T, tables map[string]bool) (*freezer, string) {
t.Helper()
dir, err := ioutil.TempDir("", "freezer")
if err != nil {
t.Fatal(err)
}
// note: using low max table size here to ensure the tests actually
// switch between multiple files.
f, err := newFreezer(dir, "", false, 2049, tables)
if err != nil {
t.Fatal("can't open freezer", err)
}
return f, dir
}
// checkAncientCount verifies that the freezer contains n items.
func checkAncientCount(t *testing.T, f *freezer, kind string, n uint64) {
t.Helper()
if frozen, _ := f.Ancients(); frozen != n {
t.Fatalf("Ancients() returned %d, want %d", frozen, n)
}
// Check at index n-1.
if n > 0 {
index := n - 1
if ok, _ := f.HasAncient(kind, index); !ok {
t.Errorf("HasAncient(%q, %d) returned false unexpectedly", kind, index)
}
if _, err := f.Ancient(kind, index); err != nil {
t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err)
}
}
// Check at index n.
index := n
if ok, _ := f.HasAncient(kind, index); ok {
t.Errorf("HasAncient(%q, %d) returned true unexpectedly", kind, index)
}
if _, err := f.Ancient(kind, index); err == nil {
t.Errorf("Ancient(%q, %d) didn't return expected error", kind, index)
} else if err != errOutOfBounds {
t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err)
}
}

View File

@ -80,10 +80,9 @@ func (t *table) AncientSize(kind string) (uint64, error) {
return t.db.AncientSize(kind)
}
// AppendAncient is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error {
return t.db.AppendAncient(number, hash, header, body, receipts, td)
// ModifyAncients runs an ancient write operation on the underlying database.
func (t *table) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (int64, error) {
return t.db.ModifyAncients(fn)
}
// TruncateAncients is a noop passthrough that just forwards the request to the underlying

View File

@ -92,9 +92,10 @@ type AncientReader interface {
// AncientWriter contains the methods required to write to immutable ancient data.
type AncientWriter interface {
// AppendAncient injects all binary blobs belong to block at the end of the
// append-only immutable table files.
AppendAncient(number uint64, hash, header, body, receipt, td []byte) error
// ModifyAncients runs a write operation on the ancient store.
// If the function returns an error, any changes to the underlying store are reverted.
// The integer return value is the total size of the written data.
ModifyAncients(func(AncientWriteOp) error) (int64, error)
// TruncateAncients discards all but the first n ancient data from the ancient store.
TruncateAncients(n uint64) error
@ -103,6 +104,15 @@ type AncientWriter interface {
Sync() error
}
// AncientWriteOp is given to the function argument of ModifyAncients.
type AncientWriteOp interface {
// Append adds an RLP-encoded item.
Append(kind string, number uint64, item interface{}) error
// AppendRaw adds an item without RLP-encoding it.
AppendRaw(kind string, number uint64, item []byte) error
}
// Reader contains the methods required to read data from both key-value as well as
// immutable ancient data.
type Reader interface {