core, miner: remove PostChainEvents (#19396)

This change:

- removes the PostChainEvents method on core.BlockChain.
- sorts 'removed log' events by block number.
- fire the NewChainHead event if we inject a canonical block into the chain
  even if the entire insertion is not successful.
- guarantees correct event ordering in all cases.
This commit is contained in:
gary rong 2019-11-29 21:22:08 +08:00 committed by Felix Lange
parent 5cc6e7a71e
commit fc7e0fe6c7
4 changed files with 163 additions and 161 deletions

View File

@ -1260,16 +1260,16 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
} }
// WriteBlockWithState writes the block and all associated state to the database. // WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
bc.chainmu.Lock() bc.chainmu.Lock()
defer bc.chainmu.Unlock() defer bc.chainmu.Unlock()
return bc.writeBlockWithState(block, receipts, state) return bc.writeBlockWithState(block, receipts, logs, state, emitHeadEvent)
} }
// writeBlockWithState writes the block and all associated state to the database, // writeBlockWithState writes the block and all associated state to the database,
// but is expects the chain mutex to be held. // but is expects the chain mutex to be held.
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
bc.wg.Add(1) bc.wg.Add(1)
defer bc.wg.Done() defer bc.wg.Done()
@ -1394,6 +1394,23 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
bc.insert(block) bc.insert(block)
} }
bc.futureBlocks.Remove(block.Hash()) bc.futureBlocks.Remove(block.Hash())
if status == CanonStatTy {
bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
// In theory we should fire a ChainHeadEvent when we inject
// a canonical block, but sometimes we can insert a batch of
// canonicial blocks. Avoid firing too much ChainHeadEvents,
// we will fire an accumulated ChainHeadEvent and disable fire
// event here.
if emitHeadEvent {
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
}
} else {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
return status, nil return status, nil
} }
@ -1444,11 +1461,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// Pre-checks passed, start the full block imports // Pre-checks passed, start the full block imports
bc.wg.Add(1) bc.wg.Add(1)
bc.chainmu.Lock() bc.chainmu.Lock()
n, events, logs, err := bc.insertChain(chain, true) n, err := bc.insertChain(chain, true)
bc.chainmu.Unlock() bc.chainmu.Unlock()
bc.wg.Done() bc.wg.Done()
bc.PostChainEvents(events, logs)
return n, err return n, err
} }
@ -1460,23 +1476,24 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// racey behaviour. If a sidechain import is in progress, and the historic state // racey behaviour. If a sidechain import is in progress, and the historic state
// is imported, but then new canon-head is added before the actual sidechain // is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again // completes, then the historic state could be pruned again
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, error) {
// If the chain is terminating, don't even bother starting up // If the chain is terminating, don't even bother starting up
if atomic.LoadInt32(&bc.procInterrupt) == 1 { if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return 0, nil, nil, nil return 0, nil
} }
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
// A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
// acquiring.
var ( var (
stats = insertStats{startTime: mclock.Now()} stats = insertStats{startTime: mclock.Now()}
events = make([]interface{}, 0, len(chain))
lastCanon *types.Block lastCanon *types.Block
coalescedLogs []*types.Log
) )
// Fire a single chain head event if we've progressed the chain
defer func() {
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})
}
}()
// Start the parallel header verifier // Start the parallel header verifier
headers := make([]*types.Header, len(chain)) headers := make([]*types.Header, len(chain))
seals := make([]bool, len(chain)) seals := make([]bool, len(chain))
@ -1526,7 +1543,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
for block != nil && err == ErrKnownBlock { for block != nil && err == ErrKnownBlock {
log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash()) log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash())
if err := bc.writeKnownBlock(block); err != nil { if err := bc.writeKnownBlock(block); err != nil {
return it.index, nil, nil, err return it.index, err
} }
lastCanon = block lastCanon = block
@ -1545,7 +1562,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) { for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) {
log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash()) log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash())
if err := bc.addFutureBlock(block); err != nil { if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err return it.index, err
} }
block, err = it.next() block, err = it.next()
} }
@ -1553,14 +1570,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
stats.ignored += it.remaining() stats.ignored += it.remaining()
// If there are any still remaining, mark as ignored // If there are any still remaining, mark as ignored
return it.index, events, coalescedLogs, err return it.index, err
// Some other error occurred, abort // Some other error occurred, abort
case err != nil: case err != nil:
bc.futureBlocks.Remove(block.Hash()) bc.futureBlocks.Remove(block.Hash())
stats.ignored += len(it.chain) stats.ignored += len(it.chain)
bc.reportBlock(block, nil, err) bc.reportBlock(block, nil, err)
return it.index, events, coalescedLogs, err return it.index, err
} }
// No validation errors for the first block (or chain prefix skipped) // No validation errors for the first block (or chain prefix skipped)
for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() { for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
@ -1572,7 +1589,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// If the header is a banned one, straight out abort // If the header is a banned one, straight out abort
if BadHashes[block.Hash()] { if BadHashes[block.Hash()] {
bc.reportBlock(block, nil, ErrBlacklistedHash) bc.reportBlock(block, nil, ErrBlacklistedHash)
return it.index, events, coalescedLogs, ErrBlacklistedHash return it.index, ErrBlacklistedHash
} }
// If the block is known (in the middle of the chain), it's a special case for // If the block is known (in the middle of the chain), it's a special case for
// Clique blocks where they can share state among each other, so importing an // Clique blocks where they can share state among each other, so importing an
@ -1589,15 +1606,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
"root", block.Root()) "root", block.Root())
if err := bc.writeKnownBlock(block); err != nil { if err := bc.writeKnownBlock(block); err != nil {
return it.index, nil, nil, err return it.index, err
} }
stats.processed++ stats.processed++
// We can assume that logs are empty here, since the only way for consecutive // We can assume that logs are empty here, since the only way for consecutive
// Clique blocks to have the same state is if there are no transactions. // Clique blocks to have the same state is if there are no transactions.
events = append(events, ChainEvent{block, block.Hash(), nil})
lastCanon = block lastCanon = block
continue continue
} }
// Retrieve the parent block and it's state to execute on top // Retrieve the parent block and it's state to execute on top
@ -1609,7 +1624,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
} }
statedb, err := state.New(parent.Root, bc.stateCache) statedb, err := state.New(parent.Root, bc.stateCache)
if err != nil { if err != nil {
return it.index, events, coalescedLogs, err return it.index, err
} }
// If we have a followup block, run that against the current state to pre-cache // If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes. // transactions and probabilistically some of the account/storage trie nodes.
@ -1634,7 +1649,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if err != nil { if err != nil {
bc.reportBlock(block, receipts, err) bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1) atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err return it.index, err
} }
// Update the metrics touched during block processing // Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
@ -1653,7 +1668,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err) bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1) atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err return it.index, err
} }
proctime := time.Since(start) proctime := time.Since(start)
@ -1665,10 +1680,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// Write the block to the chain and get the status. // Write the block to the chain and get the status.
substart = time.Now() substart = time.Now()
status, err := bc.writeBlockWithState(block, receipts, statedb) status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false)
if err != nil { if err != nil {
atomic.StoreUint32(&followupInterrupt, 1) atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err return it.index, err
} }
atomic.StoreUint32(&followupInterrupt, 1) atomic.StoreUint32(&followupInterrupt, 1)
@ -1686,8 +1701,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
"elapsed", common.PrettyDuration(time.Since(start)), "elapsed", common.PrettyDuration(time.Since(start)),
"root", block.Root()) "root", block.Root())
coalescedLogs = append(coalescedLogs, logs...)
events = append(events, ChainEvent{block, block.Hash(), logs})
lastCanon = block lastCanon = block
// Only count canonical blocks for GC processing time // Only count canonical blocks for GC processing time
@ -1698,7 +1711,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
"root", block.Root()) "root", block.Root())
events = append(events, ChainSideEvent{block})
default: default:
// This in theory is impossible, but lets be nice to our future selves and leave // This in theory is impossible, but lets be nice to our future selves and leave
@ -1717,24 +1729,20 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// Any blocks remaining here? The only ones we care about are the future ones // Any blocks remaining here? The only ones we care about are the future ones
if block != nil && err == consensus.ErrFutureBlock { if block != nil && err == consensus.ErrFutureBlock {
if err := bc.addFutureBlock(block); err != nil { if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err return it.index, err
} }
block, err = it.next() block, err = it.next()
for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() { for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() {
if err := bc.addFutureBlock(block); err != nil { if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err return it.index, err
} }
stats.queued++ stats.queued++
} }
} }
stats.ignored += it.remaining() stats.ignored += it.remaining()
// Append a single chain head event if we've progressed the chain return it.index, err
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
events = append(events, ChainHeadEvent{lastCanon})
}
return it.index, events, coalescedLogs, err
} }
// insertSideChain is called when an import batch hits upon a pruned ancestor // insertSideChain is called when an import batch hits upon a pruned ancestor
@ -1743,7 +1751,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// //
// The method writes all (header-and-body-valid) blocks to disk, then tries to // The method writes all (header-and-body-valid) blocks to disk, then tries to
// switch over to the new chain if the TD exceeded the current chain. // switch over to the new chain if the TD exceeded the current chain.
func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) { func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, error) {
var ( var (
externTd *big.Int externTd *big.Int
current = bc.CurrentBlock() current = bc.CurrentBlock()
@ -1779,7 +1787,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
// If someone legitimately side-mines blocks, they would still be imported as usual. However, // If someone legitimately side-mines blocks, they would still be imported as usual. However,
// we cannot risk writing unverified blocks to disk when they obviously target the pruning // we cannot risk writing unverified blocks to disk when they obviously target the pruning
// mechanism. // mechanism.
return it.index, nil, nil, errors.New("sidechain ghost-state attack") return it.index, errors.New("sidechain ghost-state attack")
} }
} }
if externTd == nil { if externTd == nil {
@ -1790,7 +1798,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
if !bc.HasBlock(block.Hash(), block.NumberU64()) { if !bc.HasBlock(block.Hash(), block.NumberU64()) {
start := time.Now() start := time.Now()
if err := bc.writeBlockWithoutState(block, externTd); err != nil { if err := bc.writeBlockWithoutState(block, externTd); err != nil {
return it.index, nil, nil, err return it.index, err
} }
log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(), log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(),
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
@ -1807,7 +1815,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
localTd := bc.GetTd(current.Hash(), current.NumberU64()) localTd := bc.GetTd(current.Hash(), current.NumberU64())
if localTd.Cmp(externTd) > 0 { if localTd.Cmp(externTd) > 0 {
log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd) log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd)
return it.index, nil, nil, err return it.index, err
} }
// Gather all the sidechain hashes (full blocks may be memory heavy) // Gather all the sidechain hashes (full blocks may be memory heavy)
var ( var (
@ -1822,7 +1830,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1) parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1)
} }
if parent == nil { if parent == nil {
return it.index, nil, nil, errors.New("missing parent") return it.index, errors.New("missing parent")
} }
// Import all the pruned blocks to make the state available // Import all the pruned blocks to make the state available
var ( var (
@ -1841,15 +1849,15 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
// memory here. // memory here.
if len(blocks) >= 2048 || memory > 64*1024*1024 { if len(blocks) >= 2048 || memory > 64*1024*1024 {
log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64()) log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64())
if _, _, _, err := bc.insertChain(blocks, false); err != nil { if _, err := bc.insertChain(blocks, false); err != nil {
return 0, nil, nil, err return 0, err
} }
blocks, memory = blocks[:0], 0 blocks, memory = blocks[:0], 0
// If the chain is terminating, stop processing blocks // If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 { if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing") log.Debug("Premature abort during blocks processing")
return 0, nil, nil, nil return 0, nil
} }
} }
} }
@ -1857,7 +1865,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64()) log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64())
return bc.insertChain(blocks, false) return bc.insertChain(blocks, false)
} }
return 0, nil, nil, nil return 0, nil
} }
// reorg takes two blocks, an old chain and a new chain and will reconstruct the // reorg takes two blocks, an old chain and a new chain and will reconstruct the
@ -1872,11 +1880,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
deletedTxs types.Transactions deletedTxs types.Transactions
addedTxs types.Transactions addedTxs types.Transactions
deletedLogs []*types.Log deletedLogs [][]*types.Log
rebirthLogs []*types.Log rebirthLogs [][]*types.Log
// collectLogs collects the logs that were generated during the // collectLogs collects the logs that were generated or removed during
// processing of the block that corresponds with the given hash. // the processing of the block that corresponds with the given hash.
// These logs are later announced as deleted or reborn // These logs are later announced as deleted or reborn
collectLogs = func(hash common.Hash, removed bool) { collectLogs = func(hash common.Hash, removed bool) {
number := bc.hc.GetBlockNumber(hash) number := bc.hc.GetBlockNumber(hash)
@ -1884,17 +1892,39 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
return return
} }
receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig) receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig)
var logs []*types.Log
for _, receipt := range receipts { for _, receipt := range receipts {
for _, log := range receipt.Logs { for _, log := range receipt.Logs {
l := *log l := *log
if removed { if removed {
l.Removed = true l.Removed = true
deletedLogs = append(deletedLogs, &l)
} else { } else {
rebirthLogs = append(rebirthLogs, &l) }
logs = append(logs, &l)
}
}
if len(logs) > 0 {
if removed {
deletedLogs = append(deletedLogs, logs)
} else {
rebirthLogs = append(rebirthLogs, logs)
} }
} }
} }
// mergeLogs returns a merged log slice with specified sort order.
mergeLogs = func(logs [][]*types.Log, reverse bool) []*types.Log {
var ret []*types.Log
if reverse {
for i := len(logs) - 1; i >= 0; i-- {
ret = append(ret, logs[i]...)
}
} else {
for i := 0; i < len(logs); i++ {
ret = append(ret, logs[i]...)
}
}
return ret
} }
) )
// Reduce the longer chain to the same number as the shorter one // Reduce the longer chain to the same number as the shorter one
@ -1990,47 +2020,20 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// this goroutine if there are no events to fire, but realistcally that only // this goroutine if there are no events to fire, but realistcally that only
// ever happens if we're reorging empty blocks, which will only happen on idle // ever happens if we're reorging empty blocks, which will only happen on idle
// networks where performance is not an issue either way. // networks where performance is not an issue either way.
//
// TODO(karalabe): Can we get rid of the goroutine somehow to guarantee correct
// event ordering?
go func() {
if len(deletedLogs) > 0 { if len(deletedLogs) > 0 {
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)})
} }
if len(rebirthLogs) > 0 { if len(rebirthLogs) > 0 {
bc.logsFeed.Send(rebirthLogs) bc.logsFeed.Send(mergeLogs(rebirthLogs, false))
} }
if len(oldChain) > 0 { if len(oldChain) > 0 {
for _, block := range oldChain { for i := len(oldChain) - 1; i >= 0; i-- {
bc.chainSideFeed.Send(ChainSideEvent{Block: block}) bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
} }
} }
}()
return nil return nil
} }
// PostChainEvents iterates over the events generated by a chain insertion and
// posts them into the event feed.
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
// post event logs for further processing
if logs != nil {
bc.logsFeed.Send(logs)
}
for _, event := range events {
switch ev := event.(type) {
case ChainEvent:
bc.chainFeed.Send(ev)
case ChainHeadEvent:
bc.chainHeadFeed.Send(ev)
case ChainSideEvent:
bc.chainSideFeed.Send(ev)
}
}
}
func (bc *BlockChain) update() { func (bc *BlockChain) update() {
futureTimer := time.NewTicker(5 * time.Second) futureTimer := time.NewTicker(5 * time.Second)
defer futureTimer.Stop() defer futureTimer.Stop()

View File

@ -22,6 +22,7 @@ import (
"math/big" "math/big"
"math/rand" "math/rand"
"os" "os"
"reflect"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -960,16 +961,20 @@ func TestLogReorgs(t *testing.T) {
} }
chain, _ = GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 3, func(i int, gen *BlockGen) {}) chain, _ = GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 3, func(i int, gen *BlockGen) {})
if _, err := blockchain.InsertChain(chain); err != nil { done := make(chan struct{})
t.Fatalf("failed to insert forked chain: %v", err) go func() {
} ev := <-rmLogsCh
timeout := time.NewTimer(1 * time.Second)
select {
case ev := <-rmLogsCh:
if len(ev.Logs) == 0 { if len(ev.Logs) == 0 {
t.Error("expected logs") t.Error("expected logs")
} }
close(done)
}()
if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
timeout := time.NewTimer(1 * time.Second)
select {
case <-done:
case <-timeout.C: case <-timeout.C:
t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.") t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
} }
@ -987,34 +992,42 @@ func TestLogRebirth(t *testing.T) {
genesis = gspec.MustCommit(db) genesis = gspec.MustCommit(db)
signer = types.NewEIP155Signer(gspec.Config.ChainID) signer = types.NewEIP155Signer(gspec.Config.ChainID)
newLogCh = make(chan bool) newLogCh = make(chan bool)
removeLogCh = make(chan bool)
) )
// listenNewLog checks whether the received logs number is equal with expected. // validateLogEvent checks whether the received logs number is equal with expected.
listenNewLog := func(sink chan []*types.Log, expect int) { validateLogEvent := func(sink interface{}, result chan bool, expect int) {
chanval := reflect.ValueOf(sink)
chantyp := chanval.Type()
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.RecvDir == 0 {
t.Fatalf("invalid channel, given type %v", chantyp)
}
cnt := 0 cnt := 0
var recv []reflect.Value
timeout := time.After(1 * time.Second)
cases := []reflect.SelectCase{{Chan: chanval, Dir: reflect.SelectRecv}, {Chan: reflect.ValueOf(timeout), Dir: reflect.SelectRecv}}
for { for {
select { chose, v, _ := reflect.Select(cases)
case logs := <-sink: if chose == 1 {
cnt += len(logs) // Not enough event received
case <-time.NewTimer(5 * time.Second).C: result <- false
// new logs timeout
newLogCh <- false
return return
} }
cnt += 1
recv = append(recv, v)
if cnt == expect { if cnt == expect {
break break
} else if cnt > expect {
// redundant logs received
newLogCh <- false
return
} }
} }
select { done := time.After(50 * time.Millisecond)
case <-sink: cases = cases[:1]
// redundant logs received cases = append(cases, reflect.SelectCase{Chan: reflect.ValueOf(done), Dir: reflect.SelectRecv})
newLogCh <- false chose, _, _ := reflect.Select(cases)
case <-time.NewTimer(100 * time.Millisecond).C: // If chose equal 0, it means receiving redundant events.
newLogCh <- true if chose == 1 {
result <- true
} else {
result <- false
} }
} }
@ -1038,12 +1051,12 @@ func TestLogRebirth(t *testing.T) {
}) })
// Spawn a goroutine to receive log events // Spawn a goroutine to receive log events
go listenNewLog(logsCh, 1) go validateLogEvent(logsCh, newLogCh, 1)
if _, err := blockchain.InsertChain(chain); err != nil { if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert chain: %v", err) t.Fatalf("failed to insert chain: %v", err)
} }
if !<-newLogCh { if !<-newLogCh {
t.Fatalf("failed to receive new log event") t.Fatal("failed to receive new log event")
} }
// Generate long reorg chain // Generate long reorg chain
@ -1060,40 +1073,31 @@ func TestLogRebirth(t *testing.T) {
}) })
// Spawn a goroutine to receive log events // Spawn a goroutine to receive log events
go listenNewLog(logsCh, 1) go validateLogEvent(logsCh, newLogCh, 1)
go validateLogEvent(rmLogsCh, removeLogCh, 1)
if _, err := blockchain.InsertChain(forkChain); err != nil { if _, err := blockchain.InsertChain(forkChain); err != nil {
t.Fatalf("failed to insert forked chain: %v", err) t.Fatalf("failed to insert forked chain: %v", err)
} }
if !<-newLogCh { if !<-newLogCh {
t.Fatalf("failed to receive new log event") t.Fatal("failed to receive new log event")
} }
// Ensure removedLog events received if !<-removeLogCh {
select { t.Fatal("failed to receive removed log event")
case ev := <-rmLogsCh:
if len(ev.Logs) == 0 {
t.Error("expected logs")
}
case <-time.NewTimer(1 * time.Second).C:
t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
} }
newBlocks, _ := GenerateChain(params.TestChainConfig, chain[len(chain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) newBlocks, _ := GenerateChain(params.TestChainConfig, chain[len(chain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
go listenNewLog(logsCh, 1) go validateLogEvent(logsCh, newLogCh, 1)
go validateLogEvent(rmLogsCh, removeLogCh, 1)
if _, err := blockchain.InsertChain(newBlocks); err != nil { if _, err := blockchain.InsertChain(newBlocks); err != nil {
t.Fatalf("failed to insert forked chain: %v", err) t.Fatalf("failed to insert forked chain: %v", err)
} }
// Ensure removedLog events received
select {
case ev := <-rmLogsCh:
if len(ev.Logs) == 0 {
t.Error("expected logs")
}
case <-time.NewTimer(1 * time.Second).C:
t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
}
// Rebirth logs should omit a newLogEvent // Rebirth logs should omit a newLogEvent
if !<-newLogCh { if !<-newLogCh {
t.Fatalf("failed to receive new log event") t.Fatal("failed to receive new log event")
}
// Ensure removedLog events received
if !<-removeLogCh {
t.Fatal("failed to receive removed log event")
} }
} }
@ -1145,7 +1149,6 @@ func TestSideLogRebirth(t *testing.T) {
logsCh := make(chan []*types.Log) logsCh := make(chan []*types.Log)
blockchain.SubscribeLogsEvent(logsCh) blockchain.SubscribeLogsEvent(logsCh)
chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) { chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
if i == 1 { if i == 1 {
// Higher block difficulty // Higher block difficulty

View File

@ -590,7 +590,7 @@ func (w *worker) resultLoop() {
logs = append(logs, receipt.Logs...) logs = append(logs, receipt.Logs...)
} }
// Commit block and state to database. // Commit block and state to database.
stat, err := w.chain.WriteBlockWithState(block, receipts, task.state) _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true)
if err != nil { if err != nil {
log.Error("Failed writing block to chain", "err", err) log.Error("Failed writing block to chain", "err", err)
continue continue
@ -601,16 +601,6 @@ func (w *worker) resultLoop() {
// Broadcast the block and announce chain insertion event // Broadcast the block and announce chain insertion event
w.mux.Post(core.NewMinedBlockEvent{Block: block}) w.mux.Post(core.NewMinedBlockEvent{Block: block})
var events []interface{}
switch stat {
case core.CanonStatTy:
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
events = append(events, core.ChainHeadEvent{Block: block})
case core.SideStatTy:
events = append(events, core.ChainSideEvent{Block: block})
}
w.chain.PostChainEvents(events, logs)
// Insert the block into the set of pending ones to resultLoop for confirmations // Insert the block into the set of pending ones to resultLoop for confirmations
w.unconfirmed.Insert(block.NumberU64(), block.Hash()) w.unconfirmed.Insert(block.NumberU64(), block.Hash())
@ -996,3 +986,11 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
} }
return nil return nil
} }
// postSideBlock fires a side chain event, only use it for testing.
func (w *worker) postSideBlock(event core.ChainSideEvent) {
select {
case w.chainSideCh <- event:
case <-w.exitCh:
}
}

View File

@ -149,9 +149,6 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain }
func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool } func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool }
func (b *testWorkerBackend) PostChainEvents(events []interface{}) {
b.chain.PostChainEvents(events, nil)
}
func (b *testWorkerBackend) newRandomUncle() *types.Block { func (b *testWorkerBackend) newRandomUncle() *types.Block {
var parent *types.Block var parent *types.Block
@ -243,8 +240,8 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) {
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
b.txPool.AddLocal(b.newRandomTx(true)) b.txPool.AddLocal(b.newRandomTx(true))
b.txPool.AddLocal(b.newRandomTx(false)) b.txPool.AddLocal(b.newRandomTx(false))
b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.newRandomUncle()}}) w.postSideBlock(core.ChainSideEvent{Block: b.newRandomUncle()})
b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.newRandomUncle()}}) w.postSideBlock(core.ChainSideEvent{Block: b.newRandomUncle()})
select { select {
case e := <-loopErr: case e := <-loopErr:
t.Fatal(e) t.Fatal(e)
@ -295,7 +292,7 @@ func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consens
} }
w.skipSealHook = func(task *task) bool { return true } w.skipSealHook = func(task *task) bool { return true }
w.fullTaskHook = func() { w.fullTaskHook = func() {
// Aarch64 unit tests are running in a VM on travis, they must // Arch64 unit tests are running in a VM on travis, they must
// be given more time to execute. // be given more time to execute.
time.Sleep(time.Second) time.Sleep(time.Second)
} }
@ -351,7 +348,8 @@ func TestStreamUncleBlock(t *testing.T) {
} }
} }
b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.uncleBlock}}) w.postSideBlock(core.ChainSideEvent{Block: b.uncleBlock})
select { select {
case <-taskCh: case <-taskCh:
case <-time.NewTimer(time.Second).C: case <-time.NewTimer(time.Second).C: