fix blockpool deadlock

- do not break from headsection on error
[remove peer after protocol quit will close switchC, until then head block can arrive and block on channel while keeping peers lock causing a deadlock.]
- more careful locking in AddBlock
This commit is contained in:
zelig 2015-04-07 18:53:05 +01:00
parent 30830652ae
commit 42fb9652f5
3 changed files with 22 additions and 12 deletions

View File

@ -624,6 +624,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
entry := self.get(hash) entry := self.get(hash)
// a peer's current head block is appearing the first time // a peer's current head block is appearing the first time
sender.lock.Lock()
if hash == sender.currentBlockHash { if hash == sender.currentBlockHash {
if sender.currentBlock == nil { if sender.currentBlock == nil {
plog.Debugf("AddBlock: add head block %s for peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) plog.Debugf("AddBlock: add head block %s for peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
@ -634,16 +635,28 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
self.status.values.Blocks++ self.status.values.Blocks++
self.status.values.BlocksInPool++ self.status.values.BlocksInPool++
self.status.lock.Unlock() self.status.lock.Unlock()
select {
case sender.currentBlockC <- block:
case <-sender.switchC:
}
} else { } else {
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash)) plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash))
// signal to head section process // signal to head section process
sender.currentBlockC <- block
} }
// self.wg.Add(1)
// go func() {
// timeout := time.After(1 * time.Second)
// select {
// case sender.currentBlockC <- block:
// case <-timeout:
// }
// self.wg.Done()
// }()
} else { } else {
plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.lock.Lock()
// update peer chain info if more recent than what we registered // update peer chain info if more recent than what we registered
if block.Td != nil && block.Td.Cmp(sender.td) > 0 { if block.Td != nil && block.Td.Cmp(sender.td) > 0 {
sender.td = block.Td sender.td = block.Td
@ -652,7 +665,6 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
sender.currentBlock = block sender.currentBlock = block
sender.headSection = nil sender.headSection = nil
} }
sender.lock.Unlock()
/* @zelig !!! /* @zelig !!!
requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section. requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section.
@ -668,6 +680,8 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
} }
*/ */
} }
sender.lock.Unlock()
if entry == nil { if entry == nil {
return return
} }

View File

@ -7,6 +7,10 @@ import (
"github.com/ethereum/go-ethereum/blockpool/test" "github.com/ethereum/go-ethereum/blockpool/test"
) )
func init() {
test.LogInit()
}
// using the mock framework in blockpool_util_test // using the mock framework in blockpool_util_test
// we test various scenarios here // we test various scenarios here

View File

@ -142,9 +142,8 @@ func (self *peer) setChainInfo(td *big.Int, c common.Hash) {
self.headSection = nil self.headSection = nil
} }
// caller must hold peer lock
func (self *peer) setChainInfoFromBlock(block *types.Block) { func (self *peer) setChainInfoFromBlock(block *types.Block) {
self.lock.Lock()
defer self.lock.Unlock()
// use the optional TD to update peer td, this helps second best peer selection // use the optional TD to update peer td, this helps second best peer selection
// in case best peer is lost // in case best peer is lost
if block.Td != nil && block.Td.Cmp(self.td) > 0 { if block.Td != nil && block.Td.Cmp(self.td) > 0 {
@ -155,11 +154,6 @@ func (self *peer) setChainInfoFromBlock(block *types.Block) {
self.currentBlock = block self.currentBlock = block
self.headSection = nil self.headSection = nil
} }
self.bp.wg.Add(1)
go func() {
self.currentBlockC <- block
self.bp.wg.Done()
}()
} }
// distribute block request among known peers // distribute block request among known peers
@ -571,7 +565,6 @@ LOOP:
self.bp.status.badPeers[self.id]++ self.bp.status.badPeers[self.id]++
self.bp.status.lock.Unlock() self.bp.status.lock.Unlock()
// there is no persistence here, so GC will just take care of cleaning up // there is no persistence here, so GC will just take care of cleaning up
break LOOP
// signal for peer switch, quit // signal for peer switch, quit
case <-switchC: case <-switchC:
@ -594,7 +587,6 @@ LOOP:
self.bp.status.badPeers[self.id]++ self.bp.status.badPeers[self.id]++
self.bp.status.lock.Unlock() self.bp.status.lock.Unlock()
plog.Debugf("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection)) plog.Debugf("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection))
break LOOP
} }
} }
if !self.idle { if !self.idle {