core: re-omit new log event when logs rebirth

This commit is contained in:
rjl493456442 2018-12-17 15:23:54 +08:00 committed by Péter Szilágyi
parent d5cae48bae
commit 690bd8a417
No known key found for this signature in database
GPG Key ID: E9AE538CEDF8293D
2 changed files with 224 additions and 9 deletions

View File

@ -1475,11 +1475,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
commonBlock *types.Block commonBlock *types.Block
deletedTxs types.Transactions deletedTxs types.Transactions
deletedLogs []*types.Log deletedLogs []*types.Log
rebirthLogs []*types.Log
// collectLogs collects the logs that were generated during the // collectLogs collects the logs that were generated during the
// processing of the block that corresponds with the given hash. // processing of the block that corresponds with the given hash.
// These logs are later announced as deleted. // These logs are later announced as deleted or reborn
collectLogs = func(hash common.Hash) { collectLogs = func(hash common.Hash, removed bool) {
// Coalesce logs and set 'Removed'.
number := bc.hc.GetBlockNumber(hash) number := bc.hc.GetBlockNumber(hash)
if number == nil { if number == nil {
return return
@ -1487,9 +1487,13 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
receipts := rawdb.ReadReceipts(bc.db, hash, *number) receipts := rawdb.ReadReceipts(bc.db, hash, *number)
for _, receipt := range receipts { for _, receipt := range receipts {
for _, log := range receipt.Logs { for _, log := range receipt.Logs {
del := *log l := *log
del.Removed = true if removed {
deletedLogs = append(deletedLogs, &del) l.Removed = true
deletedLogs = append(deletedLogs, &l)
} else {
rebirthLogs = append(rebirthLogs, &l)
}
} }
} }
} }
@ -1502,7 +1506,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
oldChain = append(oldChain, oldBlock) oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...) deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
collectLogs(oldBlock.Hash()) collectLogs(oldBlock.Hash(), true)
} }
} else { } else {
// reduce new chain and append new chain blocks for inserting later on // reduce new chain and append new chain blocks for inserting later on
@ -1526,7 +1530,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
oldChain = append(oldChain, oldBlock) oldChain = append(oldChain, oldBlock)
newChain = append(newChain, newBlock) newChain = append(newChain, newBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...) deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
collectLogs(oldBlock.Hash()) collectLogs(oldBlock.Hash(), true)
oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
if oldBlock == nil { if oldBlock == nil {
@ -1552,6 +1556,10 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
for i := len(newChain) - 1; i >= 0; i-- { for i := len(newChain) - 1; i >= 0; i-- {
// insert the block in the canonical way, re-writing history // insert the block in the canonical way, re-writing history
bc.insert(newChain[i]) bc.insert(newChain[i])
// collect reborn logs due to chain reorg(except head block)
if i != 0 {
collectLogs(newChain[i].Hash(), false)
}
// write lookup entries for hash based transaction/receipt searches // write lookup entries for hash based transaction/receipt searches
rawdb.WriteTxLookupEntries(bc.db, newChain[i]) rawdb.WriteTxLookupEntries(bc.db, newChain[i])
addedTxs = append(addedTxs, newChain[i].Transactions()...) addedTxs = append(addedTxs, newChain[i].Transactions()...)
@ -1569,6 +1577,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
if len(deletedLogs) > 0 { if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
} }
if len(rebirthLogs) > 0 {
go bc.logsFeed.Send(rebirthLogs)
}
if len(oldChain) > 0 { if len(oldChain) > 0 {
go func() { go func() {
for _, block := range oldChain { for _, block := range oldChain {

View File

@ -867,7 +867,6 @@ func TestChainTxReorgs(t *testing.T) {
} }
func TestLogReorgs(t *testing.T) { func TestLogReorgs(t *testing.T) {
var ( var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey) addr1 = crypto.PubkeyToAddress(key1.PublicKey)
@ -913,6 +912,211 @@ func TestLogReorgs(t *testing.T) {
} }
} }
func TestLogRebirth(t *testing.T) {
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
db = ethdb.NewMemDatabase()
// this code generates a log
code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}
genesis = gspec.MustCommit(db)
signer = types.NewEIP155Signer(gspec.Config.ChainID)
newLogCh = make(chan bool)
)
// listenNewLog checks whether the received logs number is equal with expected.
listenNewLog := func(sink chan []*types.Log, expect int) {
cnt := 0
for {
select {
case logs := <-sink:
cnt += len(logs)
case <-time.NewTimer(5 * time.Second).C:
// new logs timeout
newLogCh <- false
return
}
if cnt == expect {
break
} else if cnt > expect {
// redundant logs received
newLogCh <- false
return
}
}
select {
case <-sink:
// redundant logs received
newLogCh <- false
case <-time.NewTimer(100 * time.Millisecond).C:
newLogCh <- true
}
}
blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
defer blockchain.Stop()
logsCh := make(chan []*types.Log)
blockchain.SubscribeLogsEvent(logsCh)
rmLogsCh := make(chan RemovedLogsEvent)
blockchain.SubscribeRemovedLogsEvent(rmLogsCh)
chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
if i == 1 {
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1)
if err != nil {
t.Fatalf("failed to create tx: %v", err)
}
gen.AddTx(tx)
}
})
// Spawn a goroutine to receive log events
go listenNewLog(logsCh, 1)
if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}
if !<-newLogCh {
t.Fatalf("failed to receive new log event")
}
// Generate long reorg chain
forkChain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
if i == 1 {
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1)
if err != nil {
t.Fatalf("failed to create tx: %v", err)
}
gen.AddTx(tx)
// Higher block difficulty
gen.OffsetTime(-9)
}
})
// Spawn a goroutine to receive log events
go listenNewLog(logsCh, 1)
if _, err := blockchain.InsertChain(forkChain); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
if !<-newLogCh {
t.Fatalf("failed to receive new log event")
}
// Ensure removedLog events received
select {
case ev := <-rmLogsCh:
if len(ev.Logs) == 0 {
t.Error("expected logs")
}
case <-time.NewTimer(1 * time.Second).C:
t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
}
newBlocks, _ := GenerateChain(params.TestChainConfig, chain[len(chain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
go listenNewLog(logsCh, 1)
if _, err := blockchain.InsertChain(newBlocks); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
// Rebirth logs should omit a newLogEvent
if !<-newLogCh {
t.Fatalf("failed to receive new log event")
}
// Ensure removedLog events received
select {
case ev := <-rmLogsCh:
if len(ev.Logs) == 0 {
t.Error("expected logs")
}
case <-time.NewTimer(1 * time.Second).C:
t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
}
}
func TestSideLogRebirth(t *testing.T) {
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
db = ethdb.NewMemDatabase()
// this code generates a log
code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}
genesis = gspec.MustCommit(db)
signer = types.NewEIP155Signer(gspec.Config.ChainID)
newLogCh = make(chan bool)
)
// listenNewLog checks whether the received logs number is equal with expected.
listenNewLog := func(sink chan []*types.Log, expect int) {
cnt := 0
for {
select {
case logs := <-sink:
cnt += len(logs)
case <-time.NewTimer(5 * time.Second).C:
// new logs timeout
newLogCh <- false
return
}
if cnt == expect {
break
} else if cnt > expect {
// redundant logs received
newLogCh <- false
return
}
}
select {
case <-sink:
// redundant logs received
newLogCh <- false
case <-time.NewTimer(100 * time.Millisecond).C:
newLogCh <- true
}
}
blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
defer blockchain.Stop()
logsCh := make(chan []*types.Log)
blockchain.SubscribeLogsEvent(logsCh)
chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
if i == 1 {
// Higher block difficulty
gen.OffsetTime(-9)
}
})
if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
// Generate side chain with lower difficulty
sideChain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
if i == 1 {
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1)
if err != nil {
t.Fatalf("failed to create tx: %v", err)
}
gen.AddTx(tx)
}
})
if _, err := blockchain.InsertChain(sideChain); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
// Generate a new block based on side chain
newBlocks, _ := GenerateChain(params.TestChainConfig, sideChain[len(sideChain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
go listenNewLog(logsCh, 1)
if _, err := blockchain.InsertChain(newBlocks); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
// Rebirth logs should omit a newLogEvent
if !<-newLogCh {
t.Fatalf("failed to receive new log event")
}
}
func TestReorgSideEvent(t *testing.T) { func TestReorgSideEvent(t *testing.T) {
var ( var (
db = rawdb.NewMemoryDatabase() db = rawdb.NewMemoryDatabase()