core: handle importing known blocks more gracefully (#19417)

* core: import known blocks if they can be inserted as canonical blocks

* core: insert knowns blocks

* core: remove useless

* core: doesn't process head block in reorg function
This commit is contained in:
gary rong 2019-05-08 19:30:36 +08:00 committed by Péter Szilágyi
parent 78477e4118
commit c113723fdb
2 changed files with 303 additions and 125 deletions

View File

@ -883,10 +883,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
var lastWrite uint64
// WriteBlockWithoutState writes only the block and its metadata to the database,
// writeBlockWithoutState writes only the block and its metadata to the database,
// but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty.
func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) {
func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) {
bc.wg.Add(1)
defer bc.wg.Done()
@ -898,6 +898,26 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e
return nil
}
// writeKnownBlock updates the head block flag with a known block
// and introduces chain reorg if necessary.
func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
bc.wg.Add(1)
defer bc.wg.Done()
current := bc.CurrentBlock()
if block.ParentHash() != current.Hash() {
if err := bc.reorg(current, block); err != nil {
return err
}
}
// Write the positional metadata for transaction/receipt lookups.
// Preimages here is empty, ignore it.
rawdb.WriteTxLookupEntries(bc.db, block)
bc.insert(block)
return nil
}
// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
bc.chainmu.Lock()
@ -1139,18 +1159,42 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot
// from the canonical chain, which has not been verified.
// Skip all known blocks that are behind us
current := bc.CurrentBlock().NumberU64()
for block != nil && err == ErrKnownBlock && current >= block.NumberU64() {
var (
current = bc.CurrentBlock()
localTd = bc.GetTd(current.Hash(), current.NumberU64())
externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1) // The first block can't be nil
)
for block != nil && err == ErrKnownBlock {
externTd = new(big.Int).Add(externTd, block.Difficulty())
if localTd.Cmp(externTd) < 0 {
break
}
stats.ignored++
block, err = it.next()
}
// The remaining blocks are still known blocks, the only scenario here is:
// During the fast sync, the pivot point is already submitted but rollback
// happens. Then node resets the head full block to a lower height via `rollback`
// and leaves a few known blocks in the database.
//
// When node runs a fast sync again, it can re-import a batch of known blocks via
// `insertChain` while a part of them have higher total difficulty than current
// head full block(new pivot point).
for block != nil && err == ErrKnownBlock {
if err := bc.writeKnownBlock(block); err != nil {
return it.index, nil, nil, err
}
lastCanon = block
block, err = it.next()
}
// Falls through to the block import
}
switch {
// First block is pruned, insert as sidechain and reorg only if TD grows enough
case err == consensus.ErrPrunedAncestor:
return bc.insertSidechain(block, it)
return bc.insertSideChain(block, it)
// First block is future, shove it (and all children) to the future queue (unknown ancestor)
case err == consensus.ErrFutureBlock || (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(it.first().ParentHash())):
@ -1313,13 +1357,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
return it.index, events, coalescedLogs, err
}
// insertSidechain is called when an import batch hits upon a pruned ancestor
// insertSideChain is called when an import batch hits upon a pruned ancestor
// error, which happens when a sidechain with a sufficiently old fork-block is
// found.
//
// The method writes all (header-and-body-valid) blocks to disk, then tries to
// switch over to the new chain if the TD exceeded the current chain.
func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) {
func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) {
var (
externTd *big.Int
current = bc.CurrentBlock()
@ -1360,7 +1404,7 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i
if !bc.HasBlock(block.Hash(), block.NumberU64()) {
start := time.Now()
if err := bc.WriteBlockWithoutState(block, externTd); err != nil {
if err := bc.writeBlockWithoutState(block, externTd); err != nil {
return it.index, nil, nil, err
}
log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(),
@ -1524,15 +1568,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
} else {
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
}
// Insert the new chain, taking care of the proper incremental order
for i := len(newChain) - 1; i >= 0; i-- {
// Insert the new chain(except the head block(reverse order)),
// taking care of the proper incremental order.
for i := len(newChain) - 1; i >= 1; i-- {
// Insert the block in the canonical way, re-writing history
bc.insert(newChain[i])
// Collect reborn logs due to chain reorg (except head block (reverse order))
if i != 0 {
collectLogs(newChain[i].Hash(), false)
}
// Collect reborn logs due to chain reorg
collectLogs(newChain[i].Hash(), false)
// Write lookup entries for hash based transaction/receipt searches
rawdb.WriteTxLookupEntries(bc.db, newChain[i])
addedTxs = append(addedTxs, newChain[i].Transactions()...)

View File

@ -1564,117 +1564,6 @@ func TestLargeReorgTrieGC(t *testing.T) {
}
}
// Benchmarks large blocks with value transfers to non-existing accounts
func benchmarkLargeNumberOfValueToNonexisting(b *testing.B, numTxs, numBlocks int, recipientFn func(uint64) common.Address, dataFn func(uint64) []byte) {
var (
signer = types.HomesteadSigner{}
testBankKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
bankFunds = big.NewInt(100000000000000000)
gspec = Genesis{
Config: params.TestChainConfig,
Alloc: GenesisAlloc{
testBankAddress: {Balance: bankFunds},
common.HexToAddress("0xc0de"): {
Code: []byte{0x60, 0x01, 0x50},
Balance: big.NewInt(0),
}, // push 1, pop
},
GasLimit: 100e6, // 100 M
}
)
// Generate the original common chain segment and the two competing forks
engine := ethash.NewFaker()
db := rawdb.NewMemoryDatabase()
genesis := gspec.MustCommit(db)
blockGenerator := func(i int, block *BlockGen) {
block.SetCoinbase(common.Address{1})
for txi := 0; txi < numTxs; txi++ {
uniq := uint64(i*numTxs + txi)
recipient := recipientFn(uniq)
//recipient := common.BigToAddress(big.NewInt(0).SetUint64(1337 + uniq))
tx, err := types.SignTx(types.NewTransaction(uniq, recipient, big.NewInt(1), params.TxGas, big.NewInt(1), nil), signer, testBankKey)
if err != nil {
b.Error(err)
}
block.AddTx(tx)
}
}
shared, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, numBlocks, blockGenerator)
b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Import the shared chain and the original canonical one
diskdb := rawdb.NewMemoryDatabase()
gspec.MustCommit(diskdb)
chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil)
if err != nil {
b.Fatalf("failed to create tester chain: %v", err)
}
b.StartTimer()
if _, err := chain.InsertChain(shared); err != nil {
b.Fatalf("failed to insert shared chain: %v", err)
}
b.StopTimer()
if got := chain.CurrentBlock().Transactions().Len(); got != numTxs*numBlocks {
b.Fatalf("Transactions were not included, expected %d, got %d", numTxs*numBlocks, got)
}
}
}
func BenchmarkBlockChain_1x1000ValueTransferToNonexisting(b *testing.B) {
var (
numTxs = 1000
numBlocks = 1
)
recipientFn := func(nonce uint64) common.Address {
return common.BigToAddress(big.NewInt(0).SetUint64(1337 + nonce))
}
dataFn := func(nonce uint64) []byte {
return nil
}
benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
}
func BenchmarkBlockChain_1x1000ValueTransferToExisting(b *testing.B) {
var (
numTxs = 1000
numBlocks = 1
)
b.StopTimer()
b.ResetTimer()
recipientFn := func(nonce uint64) common.Address {
return common.BigToAddress(big.NewInt(0).SetUint64(1337))
}
dataFn := func(nonce uint64) []byte {
return nil
}
benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
}
func BenchmarkBlockChain_1x1000Executions(b *testing.B) {
var (
numTxs = 1000
numBlocks = 1
)
b.StopTimer()
b.ResetTimer()
recipientFn := func(nonce uint64) common.Address {
return common.BigToAddress(big.NewInt(0).SetUint64(0xc0de))
}
dataFn := func(nonce uint64) []byte {
return nil
}
benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
}
// Tests that importing a very large side fork, which is larger than the canon chain,
// but where the difficulty per block is kept low: this means that it will not
// overtake the 'canon' chain until after it's passed canon by about 200 blocks.
@ -1812,6 +1701,138 @@ func TestPrunedImportSide(t *testing.T) {
testSideImport(t, 1, -10)
}
func TestInsertKnownHeaders(t *testing.T) { testInsertKnownChainData(t, "headers") }
func TestInsertKnownReceiptChain(t *testing.T) { testInsertKnownChainData(t, "receipts") }
func TestInsertKnownBlocks(t *testing.T) { testInsertKnownChainData(t, "blocks") }
func testInsertKnownChainData(t *testing.T, typ string) {
engine := ethash.NewFaker()
db := rawdb.NewMemoryDatabase()
genesis := new(Genesis).MustCommit(db)
blocks, receipts := GenerateChain(params.TestChainConfig, genesis, engine, db, 32, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) })
// A longer chain but total difficulty is lower.
blocks2, receipts2 := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, db, 65, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) })
// A shorter chain but total difficulty is higher.
blocks3, receipts3 := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, db, 64, func(i int, b *BlockGen) {
b.SetCoinbase(common.Address{1})
b.OffsetTime(-9) // A higher difficulty
})
// Import the shared chain and the original canonical one
chaindb := rawdb.NewMemoryDatabase()
new(Genesis).MustCommit(chaindb)
chain, err := NewBlockChain(chaindb, nil, params.TestChainConfig, engine, vm.Config{}, nil)
if err != nil {
t.Fatalf("failed to create tester chain: %v", err)
}
var (
inserter func(blocks []*types.Block, receipts []types.Receipts) error
asserter func(t *testing.T, block *types.Block)
)
headers, headers2 := make([]*types.Header, 0, len(blocks)), make([]*types.Header, 0, len(blocks2))
for _, block := range blocks {
headers = append(headers, block.Header())
}
for _, block := range blocks2 {
headers2 = append(headers2, block.Header())
}
if typ == "headers" {
inserter = func(blocks []*types.Block, receipts []types.Receipts) error {
headers := make([]*types.Header, 0, len(blocks))
for _, block := range blocks {
headers = append(headers, block.Header())
}
_, err := chain.InsertHeaderChain(headers, 1)
return err
}
asserter = func(t *testing.T, block *types.Block) {
if chain.CurrentHeader().Hash() != block.Hash() {
t.Fatalf("current head header mismatch, have %v, want %v", chain.CurrentHeader().Hash().Hex(), block.Hash().Hex())
}
}
} else if typ == "receipts" {
inserter = func(blocks []*types.Block, receipts []types.Receipts) error {
headers := make([]*types.Header, 0, len(blocks))
for _, block := range blocks {
headers = append(headers, block.Header())
}
_, err := chain.InsertHeaderChain(headers, 1)
if err != nil {
return err
}
_, err = chain.InsertReceiptChain(blocks, receipts)
return err
}
asserter = func(t *testing.T, block *types.Block) {
if chain.CurrentFastBlock().Hash() != block.Hash() {
t.Fatalf("current head fast block mismatch, have %v, want %v", chain.CurrentFastBlock().Hash().Hex(), block.Hash().Hex())
}
}
} else {
inserter = func(blocks []*types.Block, receipts []types.Receipts) error {
_, err := chain.InsertChain(blocks)
return err
}
asserter = func(t *testing.T, block *types.Block) {
if chain.CurrentBlock().Hash() != block.Hash() {
t.Fatalf("current head block mismatch, have %v, want %v", chain.CurrentBlock().Hash().Hex(), block.Hash().Hex())
}
}
}
if err := inserter(blocks, receipts); err != nil {
t.Fatalf("failed to insert chain data: %v", err)
}
// Reimport the chain data again. All the imported
// chain data are regarded "known" data.
if err := inserter(blocks, receipts); err != nil {
t.Fatalf("failed to insert chain data: %v", err)
}
asserter(t, blocks[len(blocks)-1])
// Import a long canonical chain with some known data as prefix.
var rollback []common.Hash
for i := len(blocks) / 2; i < len(blocks); i++ {
rollback = append(rollback, blocks[i].Hash())
}
chain.Rollback(rollback)
if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil {
t.Fatalf("failed to insert chain data: %v", err)
}
asserter(t, blocks2[len(blocks2)-1])
// Import a heavier shorter but higher total difficulty chain with some known data as prefix.
if err := inserter(append(blocks, blocks3...), append(receipts, receipts3...)); err != nil {
t.Fatalf("failed to insert chain data: %v", err)
}
asserter(t, blocks3[len(blocks3)-1])
// Import a longer but lower total difficulty chain with some known data as prefix.
if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil {
t.Fatalf("failed to insert chain data: %v", err)
}
// The head shouldn't change.
asserter(t, blocks3[len(blocks3)-1])
if typ != "headers" {
// Rollback the heavier chain and re-insert the longer chain again
for i := 0; i < len(blocks3); i++ {
rollback = append(rollback, blocks3[i].Hash())
}
chain.Rollback(rollback)
if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil {
t.Fatalf("failed to insert chain data: %v", err)
}
asserter(t, blocks2[len(blocks2)-1])
}
}
// getLongAndShortChains returns two chains,
// A is longer, B is heavier
func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error) {
@ -1931,3 +1952,116 @@ func TestReorgToShorterRemovesCanonMappingHeaderChain(t *testing.T) {
t.Errorf("expected header to be gone: %v", headerByNum.Number.Uint64())
}
}
// Benchmarks large blocks with value transfers to non-existing accounts
func benchmarkLargeNumberOfValueToNonexisting(b *testing.B, numTxs, numBlocks int, recipientFn func(uint64) common.Address, dataFn func(uint64) []byte) {
var (
signer = types.HomesteadSigner{}
testBankKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
bankFunds = big.NewInt(100000000000000000)
gspec = Genesis{
Config: params.TestChainConfig,
Alloc: GenesisAlloc{
testBankAddress: {Balance: bankFunds},
common.HexToAddress("0xc0de"): {
Code: []byte{0x60, 0x01, 0x50},
Balance: big.NewInt(0),
}, // push 1, pop
},
GasLimit: 100e6, // 100 M
}
)
// Generate the original common chain segment and the two competing forks
engine := ethash.NewFaker()
db := rawdb.NewMemoryDatabase()
genesis := gspec.MustCommit(db)
blockGenerator := func(i int, block *BlockGen) {
block.SetCoinbase(common.Address{1})
for txi := 0; txi < numTxs; txi++ {
uniq := uint64(i*numTxs + txi)
recipient := recipientFn(uniq)
tx, err := types.SignTx(types.NewTransaction(uniq, recipient, big.NewInt(1), params.TxGas, big.NewInt(1), nil), signer, testBankKey)
if err != nil {
b.Error(err)
}
block.AddTx(tx)
}
}
shared, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, numBlocks, blockGenerator)
b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Import the shared chain and the original canonical one
diskdb := rawdb.NewMemoryDatabase()
gspec.MustCommit(diskdb)
chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil)
if err != nil {
b.Fatalf("failed to create tester chain: %v", err)
}
b.StartTimer()
if _, err := chain.InsertChain(shared); err != nil {
b.Fatalf("failed to insert shared chain: %v", err)
}
b.StopTimer()
if got := chain.CurrentBlock().Transactions().Len(); got != numTxs*numBlocks {
b.Fatalf("Transactions were not included, expected %d, got %d", numTxs*numBlocks, got)
}
}
}
func BenchmarkBlockChain_1x1000ValueTransferToNonexisting(b *testing.B) {
var (
numTxs = 1000
numBlocks = 1
)
recipientFn := func(nonce uint64) common.Address {
return common.BigToAddress(big.NewInt(0).SetUint64(1337 + nonce))
}
dataFn := func(nonce uint64) []byte {
return nil
}
benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
}
func BenchmarkBlockChain_1x1000ValueTransferToExisting(b *testing.B) {
var (
numTxs = 1000
numBlocks = 1
)
b.StopTimer()
b.ResetTimer()
recipientFn := func(nonce uint64) common.Address {
return common.BigToAddress(big.NewInt(0).SetUint64(1337))
}
dataFn := func(nonce uint64) []byte {
return nil
}
benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
}
func BenchmarkBlockChain_1x1000Executions(b *testing.B) {
var (
numTxs = 1000
numBlocks = 1
)
b.StopTimer()
b.ResetTimer()
recipientFn := func(nonce uint64) common.Address {
return common.BigToAddress(big.NewInt(0).SetUint64(0xc0de))
}
dataFn := func(nonce uint64) []byte {
return nil
}
benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
}