Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
2 changed files with 43 additions and 13 deletions
Showing only changes of commit 86af788790 - Show all commits

View File

@ -1965,8 +1965,8 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
oldChain types.Blocks oldChain types.Blocks
commonBlock *types.Block commonBlock *types.Block
deletedTxs types.Transactions deletedTxs []common.Hash
addedTxs types.Transactions addedTxs []common.Hash
deletedLogs [][]*types.Log deletedLogs [][]*types.Log
rebirthLogs [][]*types.Log rebirthLogs [][]*types.Log
@ -1976,7 +1976,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// Old chain is longer, gather all transactions and logs as deleted ones // Old chain is longer, gather all transactions and logs as deleted ones
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) { for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
oldChain = append(oldChain, oldBlock) oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...) for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}
// Collect deleted logs for notification // Collect deleted logs for notification
logs := bc.collectLogs(oldBlock.Hash(), true) logs := bc.collectLogs(oldBlock.Hash(), true)
@ -2006,7 +2008,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
} }
// Remove an old block as well as stash away a new block // Remove an old block as well as stash away a new block
oldChain = append(oldChain, oldBlock) oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...) for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}
// Collect deleted logs for notification // Collect deleted logs for notification
logs := bc.collectLogs(oldBlock.Hash(), true) logs := bc.collectLogs(oldBlock.Hash(), true)
@ -2025,6 +2029,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
return fmt.Errorf("invalid new chain") return fmt.Errorf("invalid new chain")
} }
} }
// Ensure the user sees large reorgs // Ensure the user sees large reorgs
if len(oldChain) > 0 && len(newChain) > 0 { if len(oldChain) > 0 && len(newChain) > 0 {
logFn := log.Info logFn := log.Info
@ -2041,7 +2046,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
} else if len(newChain) > 0 { } else if len(newChain) > 0 {
// Special case happens in the post merge stage that current head is // Special case happens in the post merge stage that current head is
// the ancestor of new head while these two blocks are not consecutive // the ancestor of new head while these two blocks are not consecutive
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].NumberU64(), "hash", newChain[0].Hash()) log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number(), "hash", newChain[0].Hash())
blockReorgAddMeter.Mark(int64(len(newChain))) blockReorgAddMeter.Mark(int64(len(newChain)))
} else { } else {
// len(newChain) == 0 && len(oldChain) > 0 // len(newChain) == 0 && len(oldChain) > 0
@ -2054,19 +2059,17 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// Insert the block in the canonical way, re-writing history // Insert the block in the canonical way, re-writing history
bc.writeHeadBlock(newChain[i]) bc.writeHeadBlock(newChain[i])
// Collect reborn logs due to chain reorg
logs := bc.collectLogs(newChain[i].Hash(), false)
if len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs)
}
// Collect the new added transactions. // Collect the new added transactions.
addedTxs = append(addedTxs, newChain[i].Transactions()...) for _, tx := range newChain[i].Transactions() {
addedTxs = append(addedTxs, tx.Hash())
} }
}
// Delete useless indexes right now which includes the non-canonical // Delete useless indexes right now which includes the non-canonical
// transaction indexes, canonical chain indexes which above the head. // transaction indexes, canonical chain indexes which above the head.
indexesBatch := bc.db.NewBatch() indexesBatch := bc.db.NewBatch()
for _, tx := range types.TxDifference(deletedTxs, addedTxs) { for _, tx := range types.HashDifference(deletedTxs, addedTxs) {
rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash()) rawdb.DeleteTxLookupEntry(indexesBatch, tx)
} }
// Delete any canonical number assignments above the new head // Delete any canonical number assignments above the new head
number := bc.CurrentBlock().NumberU64() number := bc.CurrentBlock().NumberU64()
@ -2080,6 +2083,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
if err := indexesBatch.Write(); err != nil { if err := indexesBatch.Write(); err != nil {
log.Crit("Failed to delete useless indexes", "err", err) log.Crit("Failed to delete useless indexes", "err", err)
} }
// Collect the logs
for i := len(newChain) - 1; i >= 1; i-- {
// Collect reborn logs due to chain reorg
logs := bc.collectLogs(newChain[i].Hash(), false)
if len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs)
}
}
// If any logs need to be fired, do it now. In theory we could avoid creating // If any logs need to be fired, do it now. In theory we could avoid creating
// this goroutine if there are no events to fire, but realistcally that only // this goroutine if there are no events to fire, but realistcally that only
// ever happens if we're reorging empty blocks, which will only happen on idle // ever happens if we're reorging empty blocks, which will only happen on idle

View File

@ -432,6 +432,24 @@ func TxDifference(a, b Transactions) Transactions {
return keep return keep
} }
// HashDifference returns a new set which is the difference between a and b.
func HashDifference(a, b []common.Hash) []common.Hash {
keep := make([]common.Hash, 0, len(a))
remove := make(map[common.Hash]struct{})
for _, hash := range b {
remove[hash] = struct{}{}
}
for _, hash := range a {
if _, ok := remove[hash]; !ok {
keep = append(keep, hash)
}
}
return keep
}
// TxByNonce implements the sort interface to allow sorting a list of transactions // TxByNonce implements the sort interface to allow sorting a list of transactions
// by their nonces. This is usually only useful for sorting transactions from a // by their nonces. This is usually only useful for sorting transactions from a
// single account, otherwise a nonce comparison doesn't make much sense. // single account, otherwise a nonce comparison doesn't make much sense.