td update from node

- reorg and simplify AddBlock
- introduce nodeCache
- TestPeerPromotionByTdOnBlock unskipped and passes
- move switchC/idleC channel creation around: solves deadlock (now respects the contract with section process: either can activate or complete at any one time)
This commit is contained in:
zelig 2015-04-10 16:31:00 +01:00
parent fc1d1f9afd
commit da7332a731
4 changed files with 149 additions and 140 deletions

View File

@ -169,6 +169,9 @@ type BlockPool struct {
// alloc-easy pool of hash slices
hashSlicePool chan []common.Hash
nodeCache map[common.Hash]*node
nodeCacheLock sync.RWMutex
// waitgroup is used in tests to wait for result-critical routines
// as well as in determining idle / syncing status
wg sync.WaitGroup //
@ -210,6 +213,7 @@ func (self *BlockPool) Start() {
self.Config.init()
self.hashSlicePool = make(chan []common.Hash, 150)
self.nodeCache = make(map[common.Hash]*node)
self.status = newStatus()
self.quit = make(chan bool)
self.pool = make(map[common.Hash]*entry)
@ -615,127 +619,104 @@ LOOP:
If the block received is the head block of the current best peer, signal it to the head section process
*/
func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
hash := block.Hash()
sender, _ := self.peers.getPeer(peerId)
if sender == nil {
return
}
self.status.lock.Lock()
self.status.activePeers[peerId]++
self.status.lock.Unlock()
entry := self.get(hash)
blockIsCurrentHead := false
sender.lock.RLock()
currentBlockHash := sender.currentBlockHash
currentBlock := sender.currentBlock
currentBlockC := sender.currentBlockC
switchC := sender.switchC
sender.lock.RUnlock()
// a peer's current head block is appearing the first time
if hash == currentBlockHash {
// this happens when block came in a newblock message but
// also if sent in a blockmsg (for instance, if we requested, only if we
// dont apply on blockrequests the restriction of flood control)
blockIsCurrentHead = true
if currentBlock == nil {
sender.lock.Lock()
sender.setChainInfoFromBlock(block)
sender.lock.Unlock()
self.status.lock.Lock()
self.status.values.BlockHashes++
self.status.values.Blocks++
self.status.values.BlocksInPool++
self.status.lock.Unlock()
// signal to head section process
select {
case currentBlockC <- block:
case <-switchC:
}
} else {
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(currentBlockHash))
}
} else {
plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(currentBlockHash))
/* @zelig !!!
requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section.
delayed B sends you block ... UNREQUESTED. Blocked
if entry == nil {
plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrUnrequestedBlock, "%x", hash)
self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()
return
}
*/
}
if entry == nil {
// FIXME: here check the cache find or create node -
// put peer as blockBy!
return
}
node := entry.node
node.lock.Lock()
defer node.lock.Unlock()
// register peer on node as source
if node.peers == nil {
node.peers = make(map[string]bool)
}
FoundBlockCurrentHead, found := node.peers[sender.id]
if !found || FoundBlockCurrentHead {
// if found but not FoundBlockCurrentHead, then no update
// necessary (||)
node.peers[sender.id] = blockIsCurrentHead
// for those that are false, TD will update their head
// for those that are true, TD is checked !
// this is checked at the time of TD calculation in checkTD
}
// check if block already received
if node.block != nil {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy)
}
hash := block.Hash()
// check if block is already inserted in the blockchain
if self.hasBlock(hash) {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already in the blockchain", hex(hash), peerId, hex(sender.currentBlockHash))
return
}
/*
@zelig needs discussing
Viktor: pow check can be delayed in a go routine and therefore cache
creation is not blocking
// validate block for PoW
if !self.verifyPoW(block) {
plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrInvalidPoW, "%x", hash)
sender, _ := self.peers.getPeer(peerId)
if sender == nil {
return
}
tdFromCurrentHead, currentBlockHash := sender.setChainInfoFromBlock(block)
self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()
entry := self.get(hash)
return
}
/* @zelig !!!
requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section.
delayed B sends you block ... UNREQUESTED. Blocked
if entry == nil {
plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrUnrequestedBlock, "%x", hash)
self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()
return
}
*/
node.block = block
node.blockBy = peerId
var bnode *node
if entry == nil {
self.nodeCacheLock.Lock()
bnode, _ = self.nodeCache[hash]
if bnode == nil {
bnode = &node{
hash: currentBlockHash,
block: block,
hashBy: peerId,
blockBy: peerId,
td: tdFromCurrentHead,
}
self.nodeCache[hash] = bnode
}
self.nodeCacheLock.Unlock()
} else {
bnode = entry.node
}
self.status.lock.Lock()
self.status.values.Blocks++
self.status.values.BlocksInPool++
self.status.lock.Unlock()
bnode.lock.Lock()
defer bnode.lock.Unlock()
// check if block already received
if bnode.block != nil {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.blockBy)
// register peer on node as source
if bnode.peers == nil {
bnode.peers = make(map[string]bool)
}
foundBlockCurrentHead, found := bnode.peers[sender.id]
if !found || foundBlockCurrentHead {
// if found but not FoundBlockCurrentHead, then no update
// necessary (||)
bnode.peers[sender.id] = (currentBlockHash == hash)
// for those that are false, TD will update their head
// for those that are true, TD is checked !
// this is checked at the time of TD calculation in checkTD
}
sender.setChainInfoFromNode(bnode)
} else {
/*
@zelig needs discussing
Viktor: pow check can be delayed in a go routine and therefore cache
creation is not blocking
// validate block for PoW
if !self.verifyPoW(block) {
plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrInvalidPoW, "%x", hash)
self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()
return
}
*/
bnode.block = block
bnode.blockBy = peerId
bnode.td = tdFromCurrentHead
self.status.lock.Lock()
self.status.values.Blocks++
self.status.values.BlocksInPool++
self.status.lock.Unlock()
}
}

View File

@ -128,7 +128,7 @@ func TestErrInsufficientChainInfo(t *testing.T) {
}
func TestIncorrectTD(t *testing.T) {
t.Skip() // td not tested atm
t.Skip("skipping TD check until network is healthy")
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil

View File

@ -18,6 +18,7 @@ type peer struct {
// last known blockchain status
td *big.Int
tdAdvertised bool
currentBlockHash common.Hash
currentBlock *types.Block
parentHash common.Hash
@ -135,21 +136,52 @@ func (self *peer) addError(code int, format string, params ...interface{}) {
}
// caller must hold peer lock
func (self *peer) setChainInfo(td *big.Int, c common.Hash) {
self.td = td
self.currentBlockHash = c
self.currentBlock = nil
self.parentHash = common.Hash{}
self.headSection = nil
func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
if self.currentBlockHash != currentBlockHash {
previousBlockHash := self.currentBlockHash
plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", self.id, td, hex(currentBlockHash), hex(previousBlockHash))
self.td = td
self.currentBlockHash = currentBlockHash
self.currentBlock = nil
self.parentHash = common.Hash{}
self.headSection = nil
}
self.tdAdvertised = true
}
// caller must hold peer lock
func (self *peer) setChainInfoFromBlock(block *types.Block) {
// use the optional TD to update peer td, this helps second best peer selection
func (self *peer) setChainInfoFromBlock(block *types.Block) (td *big.Int, currentBlockHash common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
hash := block.Hash()
// this happens when block came in a newblock message but
// also if sent in a blockmsg (for instance, if we requested, only if we
// dont apply on blockrequests the restriction of flood control)
currentBlockHash = self.currentBlockHash
if currentBlockHash == hash && self.currentBlock == nil {
// signal to head section process
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash))
select {
case self.currentBlockC <- block:
case <-self.switchC:
}
return self.td, currentBlockHash
} else {
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash))
return nil, currentBlockHash
}
}
// this will use the TD given by the first peer to update peer td, this helps second best peer selection
// :FIXME: node
func (self *peer) setChainInfoFromNode(n *node) {
// in case best peer is lost
if block.Td != nil && block.Td.Cmp(self.td) > 0 {
plog.DebugDetailf("setChainInfoFromBlock: update <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(block.Hash()), self.td, block.Td)
self.td = block.Td
block := n.block
hash := block.Hash()
if n.td != nil && n.td.Cmp(self.td) > 0 {
plog.DebugDetailf("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td)
self.td = n.td
self.currentBlockHash = block.Hash()
self.parentHash = block.ParentHash()
self.currentBlock = block
@ -218,17 +250,11 @@ func (self *peers) addPeer(
if found {
// when called on an already connected peer, it means a newBlockMsg is received
// peer head info is updated
p.lock.Lock()
if p.currentBlockHash != currentBlockHash {
previousBlockHash = p.currentBlockHash
plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash))
p.setChainInfo(td, currentBlockHash)
self.status.lock.Lock()
self.status.values.NewBlocks++
self.status.lock.Unlock()
}
p.lock.Unlock()
p.setChainInfo(td, currentBlockHash)
// FIXME: only count the same block once
self.status.lock.Lock()
self.status.values.NewBlocks++
self.status.lock.Unlock()
} else {
p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError)
@ -333,8 +359,8 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
close(oldp.switchC)
}
if newp != nil {
newp.idleC = make(chan bool)
newp.switchC = make(chan bool)
// newp.idleC = make(chan bool)
// newp.switchC = make(chan bool)
// if new best peer has no head section yet, create it and run it
// otherwise head section is an element of peer.sections
if newp.headSection == nil {
@ -354,6 +380,9 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
}
}()
} else {
newp.idleC = make(chan bool)
newp.switchC = make(chan bool)
}
var connected = make(map[common.Hash]*section)
@ -528,10 +557,12 @@ func (self *peer) getBlockHashes() bool {
// main loop for head section process
func (self *peer) run() {
self.lock.RLock()
self.lock.Lock()
self.switchC = make(chan bool)
self.idleC = make(chan bool)
switchC := self.switchC
plog.Debugf("HeadSection: <%s> section process for head %s started", self.id, hex(self.currentBlockHash))
self.lock.RUnlock()
self.lock.Unlock()
self.blockHashesRequestTimer = nil

View File

@ -145,7 +145,6 @@ func TestAddPeer(t *testing.T) {
}
func TestPeerPromotionByTdOnBlock(t *testing.T) {
t.Skip()
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
@ -155,28 +154,26 @@ func TestPeerPromotionByTdOnBlock(t *testing.T) {
peer2 := blockPoolTester.newPeer("peer2", 4, 4)
blockPool.Start()
blockPoolTester.tds = make(map[int]int)
blockPoolTester.tds[3] = 3
// pool
peer0.AddPeer()
peer0.serveBlocks(1, 2)
best := peer1.AddPeer()
// this tests that peer1 is not promoted over peer0 yet
if best {
t.Errorf("peer1 (TD=1) should not be set as best")
return
}
best = peer2.AddPeer()
peer2.serveBlocks(3, 4)
peer2.serveBlockHashes(4, 3, 2, 1)
// hashes := blockPoolTester.hashPool.IndexesToHashes([]int{2, 3})
peer1.serveBlocks(2, 3)
peer1.sendBlocks(3, 4)
blockPool.RemovePeer("peer2")
if blockPool.peers.best.id != "peer1" {
t.Errorf("peer1 (TD=3) should be set as best")
return
}
peer1.serveBlocks(0, 1, 2)
peer1.serveBlocks(0, 1, 2, 3)
blockPool.Wait(waitTimeout)
blockPool.Stop()