Merge branch 'ethersphere-blockpool2' into poc-9

This commit is contained in:
obscuren 2015-03-03 20:27:15 +01:00
commit 2707891966
5 changed files with 89 additions and 51 deletions

View File

@ -17,7 +17,7 @@ var plog = ethlogger.NewLogger("Blockpool")
var ( var (
// max number of block hashes sent in one request // max number of block hashes sent in one request
blockHashesBatchSize = 512 blockHashesBatchSize = 256
// max number of blocks sent in one request // max number of blocks sent in one request
blockBatchSize = 64 blockBatchSize = 64
// interval between two consecutive block checks (and requests) // interval between two consecutive block checks (and requests)
@ -27,11 +27,13 @@ var (
// interval between two consecutive block hash checks (and requests) // interval between two consecutive block hash checks (and requests)
blockHashesRequestInterval = 3 * time.Second blockHashesRequestInterval = 3 * time.Second
// max number of idle iterations, ie., check through a section without new blocks coming in // max number of idle iterations, ie., check through a section without new blocks coming in
blocksRequestMaxIdleRounds = 100 blocksRequestMaxIdleRounds = 20
// timeout interval: max time allowed for peer without sending a block hash // timeout interval: max time allowed for peer without sending a block hash
blockHashesTimeout = 60 * time.Second blockHashesTimeout = 60 * time.Second
// timeout interval: max time allowed for peer without sending a block // timeout interval: max time allowed for peer without sending a block
blocksTimeout = 120 * time.Second blocksTimeout = 60 * time.Second
//
idleBestPeerTimeout = 60 * time.Second
) )
// config embedded in components, by default fall back to constants // config embedded in components, by default fall back to constants
@ -45,6 +47,7 @@ type Config struct {
BlocksRequestInterval time.Duration BlocksRequestInterval time.Duration
BlockHashesTimeout time.Duration BlockHashesTimeout time.Duration
BlocksTimeout time.Duration BlocksTimeout time.Duration
IdleBestPeerTimeout time.Duration
} }
// blockpool errors // blockpool errors
@ -53,6 +56,7 @@ const (
ErrInvalidPoW ErrInvalidPoW
ErrUnrequestedBlock ErrUnrequestedBlock
ErrInsufficientChainInfo ErrInsufficientChainInfo
ErrIdleTooLong
) )
var errorToString = map[int]string{ var errorToString = map[int]string{
@ -60,6 +64,7 @@ var errorToString = map[int]string{
ErrInvalidPoW: "Invalid PoW", ErrInvalidPoW: "Invalid PoW",
ErrUnrequestedBlock: "Unrequested block", ErrUnrequestedBlock: "Unrequested block",
ErrInsufficientChainInfo: "Insufficient chain info", ErrInsufficientChainInfo: "Insufficient chain info",
ErrIdleTooLong: "Idle too long",
} }
// init initialises all your laundry // init initialises all your laundry
@ -88,6 +93,9 @@ func (self *Config) init() {
if self.BlocksTimeout == 0 { if self.BlocksTimeout == 0 {
self.BlocksTimeout = blocksTimeout self.BlocksTimeout = blocksTimeout
} }
if self.IdleBestPeerTimeout == 0 {
self.IdleBestPeerTimeout = idleBestPeerTimeout
}
} }
// node is the basic unit of the internal model of block chain/tree in the blockpool // node is the basic unit of the internal model of block chain/tree in the blockpool
@ -149,6 +157,15 @@ func New(
} }
} }
func severity(code int) ethlogger.LogLevel {
switch code {
case ErrUnrequestedBlock:
return ethlogger.WarnLevel
default:
return ethlogger.ErrorLevel
}
}
// allows restart // allows restart
func (self *BlockPool) Start() { func (self *BlockPool) Start() {
self.lock.Lock() self.lock.Lock()
@ -169,6 +186,7 @@ func (self *BlockPool) Start() {
errors: &errs.Errors{ errors: &errs.Errors{
Package: "Blockpool", Package: "Blockpool",
Errors: errorToString, Errors: errorToString,
Level: severity,
}, },
peers: make(map[string]*peer), peers: make(map[string]*peer),
status: self.status, status: self.status,
@ -363,6 +381,8 @@ LOOP:
// check if known block connecting the downloaded chain to our blockchain // check if known block connecting the downloaded chain to our blockchain
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash)) plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash))
if len(nodes) == 1 { if len(nodes) == 1 {
plog.DebugDetailf("AddBlockHashes: singleton section pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash))
// create new section if needed and push it to the blockchain // create new section if needed and push it to the blockchain
sec = self.newSection(nodes) sec = self.newSection(nodes)
sec.addSectionToBlockChain(bestpeer) sec.addSectionToBlockChain(bestpeer)
@ -379,6 +399,8 @@ LOOP:
and td together with blockBy are recorded on the node and td together with blockBy are recorded on the node
*/ */
if len(nodes) == 0 && child != nil { if len(nodes) == 0 && child != nil {
plog.DebugDetailf("AddBlockHashes: child section [%s] pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain", sectionhex(child), peerId, hex(bestpeer.currentBlockHash), hex(hash))
child.addSectionToBlockChain(bestpeer) child.addSectionToBlockChain(bestpeer)
} }
} }
@ -446,10 +468,12 @@ LOOP:
*/ */
sec = self.linkSections(nodes, parent, child) sec = self.linkSections(nodes, parent, child)
self.status.lock.Lock() if sec != nil {
self.status.values.BlockHashes += len(nodes) self.status.lock.Lock()
self.status.lock.Unlock() self.status.values.BlockHashes += len(nodes)
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): section [%s] created", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) self.status.lock.Unlock()
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): section [%s] created", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec))
}
self.chainLock.Unlock() self.chainLock.Unlock()
@ -549,6 +573,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
self.status.lock.Unlock() self.status.lock.Unlock()
} else { } else {
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash)) plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash))
sender.currentBlockC <- block
} }
} else { } else {
@ -644,11 +669,15 @@ LOOP:
we need to relink both complete and incomplete sections we need to relink both complete and incomplete sections
the latter could have been blockHashesRequestsComplete before being delinked from its parent the latter could have been blockHashesRequestsComplete before being delinked from its parent
*/ */
if parent == nil && sec.bottom.block != nil { if parent == nil {
if entry := self.get(sec.bottom.block.ParentHash()); entry != nil { if sec.bottom.block != nil {
parent = entry.section if entry := self.get(sec.bottom.block.ParentHash()); entry != nil {
plog.DebugDetailf("activateChain: [%s]-[%s] relink", sectionhex(parent), sectionhex(sec)) parent = entry.section
link(parent, sec) plog.DebugDetailf("activateChain: [%s]-[%s] link", sectionhex(parent), sectionhex(sec))
link(parent, sec)
}
} else {
plog.DebugDetailf("activateChain: section [%s] activated by peer <%s> has missing root block", sectionhex(sec), p.id)
} }
} }
sec = parent sec = parent
@ -704,9 +733,15 @@ func (self *BlockPool) remove(sec *section) {
// delete node entries from pool index under pool lock // delete node entries from pool index under pool lock
self.lock.Lock() self.lock.Lock()
defer self.lock.Unlock() defer self.lock.Unlock()
for _, node := range sec.nodes { for _, node := range sec.nodes {
delete(self.pool, string(node.hash)) delete(self.pool, string(node.hash))
} }
if sec.initialised && sec.poolRootIndex != 0 {
self.status.lock.Lock()
self.status.values.BlocksInPool -= len(sec.nodes) - sec.missing
self.status.lock.Unlock()
}
} }
func (self *BlockPool) getHashSlice() (s [][]byte) { func (self *BlockPool) getHashSlice() (s [][]byte) {

View File

@ -20,12 +20,13 @@ func TestBlockPoolConfig(t *testing.T) {
test.CheckDuration("BlocksRequestInterval", c.BlocksRequestInterval, blocksRequestInterval, t) test.CheckDuration("BlocksRequestInterval", c.BlocksRequestInterval, blocksRequestInterval, t)
test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, blockHashesTimeout, t) test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, blockHashesTimeout, t)
test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t) test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t)
test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, idleBestPeerTimeout, t)
} }
func TestBlockPoolOverrideConfig(t *testing.T) { func TestBlockPoolOverrideConfig(t *testing.T) {
test.LogInit() test.LogInit()
blockPool := &BlockPool{Config: &Config{}} blockPool := &BlockPool{Config: &Config{}}
c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0} c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second}
blockPool.Config = c blockPool.Config = c
blockPool.Start() blockPool.Start()
@ -37,4 +38,5 @@ func TestBlockPoolOverrideConfig(t *testing.T) {
test.CheckDuration("BlocksRequestInterval", c.BlocksRequestInterval, 100*time.Millisecond, t) test.CheckDuration("BlocksRequestInterval", c.BlocksRequestInterval, 100*time.Millisecond, t)
test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, 90*time.Second, t) test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, 90*time.Second, t)
test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t) test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t)
test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, 30*time.Second, t)
} }

View File

@ -37,7 +37,7 @@ type peer struct {
currentBlockC chan *types.Block currentBlockC chan *types.Block
headSectionC chan *section headSectionC chan *section
// channels to signal peers witch and peer quit // channels to signal peer switch and peer quit to section processes
idleC chan bool idleC chan bool
switchC chan bool switchC chan bool
@ -47,7 +47,7 @@ type peer struct {
// timers for head section process // timers for head section process
blockHashesRequestTimer <-chan time.Time blockHashesRequestTimer <-chan time.Time
blocksRequestTimer <-chan time.Time blocksRequestTimer <-chan time.Time
suicide <-chan time.Time suicideC <-chan time.Time
idle bool idle bool
} }
@ -214,6 +214,7 @@ func (self *peers) addPeer(
// check peer current head // check peer current head
if self.bp.hasBlock(currentBlockHash) { if self.bp.hasBlock(currentBlockHash) {
// peer not ahead // peer not ahead
plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash))
return false return false
} }
@ -285,8 +286,7 @@ func (self *peers) removePeer(id string) {
} }
} }
// switchPeer launches section processes based on information about // switchPeer launches section processes
// shared interest and legacy of peers
func (self *BlockPool) switchPeer(oldp, newp *peer) { func (self *BlockPool) switchPeer(oldp, newp *peer) {
// first quit AddBlockHashes, requestHeadSection and activateChain // first quit AddBlockHashes, requestHeadSection and activateChain
@ -371,16 +371,16 @@ func (self *peer) handleSection(sec *section) {
self.bp.syncing() self.bp.syncing()
} }
self.suicide = time.After(self.bp.Config.BlockHashesTimeout) self.suicideC = time.After(self.bp.Config.BlockHashesTimeout)
plog.DebugDetailf("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash)) plog.DebugDetailf("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash))
} else { } else {
if !self.idle { if !self.idle {
self.idle = true self.idle = true
self.suicide = nil
self.bp.wg.Done() self.bp.wg.Done()
} }
plog.DebugDetailf("HeadSection: <%s> head section [%s] created", self.id, sectionhex(sec)) plog.DebugDetailf("HeadSection: <%s> head section [%s] created", self.id, sectionhex(sec))
self.suicideC = time.After(self.bp.Config.IdleBestPeerTimeout)
} }
} }
@ -450,7 +450,7 @@ func (self *peer) getBlockHashes() {
self.blockHashesRequestTimer = nil self.blockHashesRequestTimer = nil
if !self.idle { if !self.idle {
self.idle = true self.idle = true
self.suicide = nil self.suicideC = time.After(self.bp.Config.IdleBestPeerTimeout)
self.bp.wg.Done() self.bp.wg.Done()
} }
} }
@ -466,7 +466,7 @@ func (self *peer) run() {
self.blockHashesRequestTimer = nil self.blockHashesRequestTimer = nil
self.blocksRequestTimer = time.After(0) self.blocksRequestTimer = time.After(0)
self.suicide = time.After(self.bp.Config.BlockHashesTimeout) self.suicideC = time.After(self.bp.Config.BlockHashesTimeout)
var quit chan bool var quit chan bool
@ -475,9 +475,20 @@ func (self *peer) run() {
LOOP: LOOP:
for { for {
select { select {
// to minitor section process behaviou
case <-ping.C: case <-ping.C:
plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle) plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle)
// idle timer started when process goes idle
case <-self.idleC:
if self.idle {
self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks...quitting", currentBlockHash))
self.bp.status.lock.Lock()
self.bp.status.badPeers[self.id]++
self.bp.status.lock.Unlock()
}
// signal from AddBlockHashes that head section for current best peer is created // signal from AddBlockHashes that head section for current best peer is created
// if sec == nil, it signals that chain info has updated (new block message) // if sec == nil, it signals that chain info has updated (new block message)
case sec := <-self.headSectionC: case sec := <-self.headSectionC:
@ -502,8 +513,8 @@ LOOP:
self.getCurrentBlock(nil) self.getCurrentBlock(nil)
// quitting on timeout // quitting on timeout
case <-self.suicide: case <-self.suicideC:
self.peerError(self.bp.peers.errors.New(ErrInsufficientChainInfo, "timed out without providing block hashes or head block", currentBlockHash)) self.peerError(self.bp.peers.errors.New(ErrInsufficientChainInfo, "timed out without providing block hashes or head block %x", currentBlockHash))
self.bp.status.lock.Lock() self.bp.status.lock.Lock()
self.bp.status.badPeers[self.id]++ self.bp.status.badPeers[self.id]++

View File

@ -138,7 +138,7 @@ func (self *section) addSectionToBlockChain(p *peer) {
plog.Warnf("penalise peers %v (hash), %v (block)", node.hashBy, node.blockBy) plog.Warnf("penalise peers %v (hash), %v (block)", node.hashBy, node.blockBy)
// or invalid block and the entire chain needs to be removed // or invalid block and the entire chain needs to be removed
self.removeInvalidChain() self.removeChain()
} else { } else {
// if all blocks inserted in this section // if all blocks inserted in this section
// then need to try to insert blocks in child section // then need to try to insert blocks in child section
@ -235,16 +235,14 @@ LOOP:
// timebomb - if section is not complete in time, nuke the entire chain // timebomb - if section is not complete in time, nuke the entire chain
case <-self.suicideTimer: case <-self.suicideTimer:
self.suicide() self.removeChain()
plog.Debugf("[%s] timeout. (%v total attempts): missing %v/%v/%v...suicide", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth) plog.Debugf("[%s] timeout. (%v total attempts): missing %v/%v/%v...suicide", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth)
self.suicideTimer = nil self.suicideTimer = nil
break LOOP
// closing suicideC triggers section suicide: removes section nodes from pool and terminates section process // closing suicideC triggers section suicide: removes section nodes from pool and terminates section process
case <-self.suicideC: case <-self.suicideC:
plog.DebugDetailf("[%s] suicide", sectionhex(self)) plog.DebugDetailf("[%s] quit", sectionhex(self))
self.unlink()
self.bp.remove(self)
plog.DebugDetailf("[%s] done", sectionhex(self))
break LOOP break LOOP
// alarm for checking blocks in the section // alarm for checking blocks in the section
@ -283,7 +281,7 @@ LOOP:
checking = false checking = false
break break
} }
plog.DebugDetailf("[%s] section proc step %v: missing %v/%v/%v", sectionhex(self), self.step, self.missing, self.lastMissing, self.depth) // plog.DebugDetailf("[%s] section proc step %v: missing %v/%v/%v", sectionhex(self), self.step, self.missing, self.lastMissing, self.depth)
if !checking { if !checking {
self.step = 0 self.step = 0
self.missing = 0 self.missing = 0
@ -522,7 +520,7 @@ func (self *section) checkRound() {
// too many idle rounds // too many idle rounds
if self.idle >= self.bp.Config.BlocksRequestMaxIdleRounds { if self.idle >= self.bp.Config.BlocksRequestMaxIdleRounds {
plog.DebugDetailf("[%s] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", sectionhex(self), self.idle, self.blocksRequests, self.missing, self.lastMissing, self.depth) plog.DebugDetailf("[%s] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", sectionhex(self), self.idle, self.blocksRequests, self.missing, self.lastMissing, self.depth)
self.suicide() self.removeChain()
} }
} else { } else {
self.idle = 0 self.idle = 0
@ -602,10 +600,12 @@ func (self *BlockPool) linkSections(nodes []*node, parent, child *section) (sec
link(parent, sec) link(parent, sec)
link(sec, child) link(sec, child)
} else { } else {
// now this can only happen if we allow response to hash request to include <from> hash if parent != nil && child != nil {
// in this case we just link parent and child (without needing root block of child section) // now this can only happen if we allow response to hash request to include <from> hash
plog.Debugf("[%s]->[%s] connecting known sections", sectionhex(parent), sectionhex(child)) // in this case we just link parent and child (without needing root block of child section)
link(parent, child) plog.Debugf("[%s]->[%s] connecting known sections", sectionhex(parent), sectionhex(child))
link(parent, child)
}
} }
return return
} }
@ -614,6 +614,7 @@ func (self *section) activate(p *peer) {
self.bp.wg.Add(1) self.bp.wg.Add(1)
select { select {
case <-self.offC: case <-self.offC:
plog.DebugDetailf("[%s] completed section process. cannot activate for peer <%s>", sectionhex(self), p.id)
self.bp.wg.Done() self.bp.wg.Done()
case self.controlC <- p: case self.controlC <- p:
plog.DebugDetailf("[%s] activate section process for peer <%s>", sectionhex(self), p.id) plog.DebugDetailf("[%s] activate section process for peer <%s>", sectionhex(self), p.id)
@ -625,22 +626,10 @@ func (self *section) deactivate() {
self.controlC <- nil self.controlC <- nil
} }
func (self *section) suicide() {
select {
case <-self.suicideC:
return
default:
}
close(self.suicideC)
}
// removes this section exacly // removes this section exacly
func (self *section) remove() { func (self *section) remove() {
select { select {
case <-self.offC: case <-self.offC:
// section is complete, no process
self.unlink()
self.bp.remove(self)
close(self.suicideC) close(self.suicideC)
plog.DebugDetailf("[%s] remove: suicide", sectionhex(self)) plog.DebugDetailf("[%s] remove: suicide", sectionhex(self))
case <-self.suicideC: case <-self.suicideC:
@ -649,21 +638,23 @@ func (self *section) remove() {
plog.DebugDetailf("[%s] remove: suicide", sectionhex(self)) plog.DebugDetailf("[%s] remove: suicide", sectionhex(self))
close(self.suicideC) close(self.suicideC)
} }
self.unlink()
self.bp.remove(self)
plog.DebugDetailf("[%s] removed section.", sectionhex(self)) plog.DebugDetailf("[%s] removed section.", sectionhex(self))
} }
// remove a section and all its descendents from the pool // remove a section and all its descendents from the pool
func (self *section) removeInvalidChain() { func (self *section) removeChain() {
// need to get the child before removeSection delinks the section // need to get the child before removeSection delinks the section
self.bp.chainLock.RLock() self.bp.chainLock.RLock()
child := self.child child := self.child
self.bp.chainLock.RUnlock() self.bp.chainLock.RUnlock()
plog.DebugDetailf("[%s] remove invalid chain", sectionhex(self)) plog.DebugDetailf("[%s] remove chain", sectionhex(self))
self.remove() self.remove()
if child != nil { if child != nil {
child.removeInvalidChain() child.removeChain()
} }
} }

View File

@ -51,7 +51,6 @@ type Status struct {
func (self *BlockPool) Status() *Status { func (self *BlockPool) Status() *Status {
self.status.lock.Lock() self.status.lock.Lock()
defer self.status.lock.Unlock() defer self.status.lock.Unlock()
self.status.values.BlockHashesInPool = len(self.pool)
self.status.values.ActivePeers = len(self.status.activePeers) self.status.values.ActivePeers = len(self.status.activePeers)
self.status.values.BestPeers = len(self.status.bestPeers) self.status.values.BestPeers = len(self.status.bestPeers)
self.status.values.BadPeers = len(self.status.badPeers) self.status.values.BadPeers = len(self.status.badPeers)