diff --git a/eth/handler.go b/eth/handler.go index 8993afe15..918d71088 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -698,7 +698,7 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { // Send the block to a subset of our peers transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { - peer.SendNewBlock(block, td) + peer.AsyncSendNewBlock(block, td) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return @@ -706,7 +706,7 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { // Otherwise if the block is indeed in out own chain, announce it if pm.blockchain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { - peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) + peer.AsyncSendNewBlockHash(block) } log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) } @@ -727,7 +727,7 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { } // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for peer, txs := range txset { - peer.SendTransactions(txs) + peer.AsyncSendTransactions(txs) } } diff --git a/eth/peer.go b/eth/peer.go index 42ead5396..953aca17b 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -37,8 +37,24 @@ var ( ) const ( - maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) - maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) + maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) + + // maxQueuedTxs is the maximum number of transaction lists to queue up before + // dropping broadcasts. This is a sensitive number as a transaction list might + // contain a single transaction, or thousands. + maxQueuedTxs = 128 + + // maxQueuedProps is the maximum number of block propagations to queue up before + // dropping broadcasts. There's not much point in queueing stale blocks, so a few + // that might cover uncles should be enough. + maxQueuedProps = 4 + + // maxQueuedAnns is the maximum number of block announcements to queue up before + // dropping broadcasts. Similarly to block propagations, there's no point to queue + // above some healthy uncle limit, so use that. + maxQueuedAnns = 4 + handshakeTimeout = 5 * time.Second ) @@ -50,6 +66,12 @@ type PeerInfo struct { Head string `json:"head"` // SHA3 hash of the peer's best owned block } +// propEvent is a block propagation, waiting for its turn in the broadcast queue. +type propEvent struct { + block *types.Block + td *big.Int +} + type peer struct { id string @@ -63,23 +85,64 @@ type peer struct { td *big.Int lock sync.RWMutex - knownTxs *set.Set // Set of transaction hashes known to be known by this peer - knownBlocks *set.Set // Set of block hashes known to be known by this peer + knownTxs *set.Set // Set of transaction hashes known to be known by this peer + knownBlocks *set.Set // Set of block hashes known to be known by this peer + queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer + queuedProps chan *propEvent // Queue of blocks to broadcast to the peer + queuedAnns chan *types.Block // Queue of blocks to announce to the peer + term chan struct{} // Termination channel to stop the broadcaster } func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - id := p.ID() - return &peer{ Peer: p, rw: rw, version: version, - id: fmt.Sprintf("%x", id[:8]), + id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), knownTxs: set.New(), knownBlocks: set.New(), + queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), + queuedProps: make(chan *propEvent, maxQueuedProps), + queuedAnns: make(chan *types.Block, maxQueuedAnns), + term: make(chan struct{}), } } +// broadcast is a write loop that multiplexes block propagations, announcements +// and transaction broadcasts into the remote peer. The goal is to have an async +// writer that does not lock up node internals. +func (p *peer) broadcast() { + for { + select { + case txs := <-p.queuedTxs: + if err := p.SendTransactions(txs); err != nil { + return + } + p.Log().Trace("Broadcast transactions", "count", len(txs)) + + case prop := <-p.queuedProps: + if err := p.SendNewBlock(prop.block, prop.td); err != nil { + return + } + p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) + + case block := <-p.queuedAnns: + if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { + return + } + p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) + + case <-p.term: + return + } + } +} + +// close signals the broadcast goroutine to terminate. +func (p *peer) close() { + close(p.term) +} + // Info gathers and returns a collection of metadata known about a peer. func (p *peer) Info() *PeerInfo { hash, td := p.Head() @@ -139,6 +202,19 @@ func (p *peer) SendTransactions(txs types.Transactions) error { return p2p.Send(p.rw, TxMsg, txs) } +// AsyncSendTransactions queues list of transactions propagation to a remote +// peer. If the peer's broadcast queue is full, the event is silently dropped. +func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { + select { + case p.queuedTxs <- txs: + for _, tx := range txs { + p.knownTxs.Add(tx.Hash()) + } + default: + p.Log().Debug("Dropping transaction propagation", "count", len(txs)) + } +} + // SendNewBlockHashes announces the availability of a number of blocks through // a hash notification. func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { @@ -153,12 +229,35 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error return p2p.Send(p.rw, NewBlockHashesMsg, request) } +// AsyncSendNewBlockHash queues the availability of a block for propagation to a +// remote peer. If the peer's broadcast queue is full, the event is silently +// dropped. +func (p *peer) AsyncSendNewBlockHash(block *types.Block) { + select { + case p.queuedAnns <- block: + p.knownBlocks.Add(block.Hash()) + default: + p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash()) + } +} + // SendNewBlock propagates an entire block to a remote peer. func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { p.knownBlocks.Add(block.Hash()) return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) } +// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If +// the peer's broadcast queue is full, the event is silently dropped. +func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { + select { + case p.queuedProps <- &propEvent{block: block, td: td}: + p.knownBlocks.Add(block.Hash()) + default: + p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash()) + } +} + // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *peer) SendBlockHeaders(headers []*types.Header) error { return p2p.Send(p.rw, BlockHeadersMsg, headers) @@ -313,7 +412,8 @@ func newPeerSet() *peerSet { } // Register injects a new peer into the working set, or returns an error if the -// peer is already known. +// peer is already known. If a new peer it registered, its broadcast loop is also +// started. func (ps *peerSet) Register(p *peer) error { ps.lock.Lock() defer ps.lock.Unlock() @@ -325,6 +425,8 @@ func (ps *peerSet) Register(p *peer) error { return errAlreadyRegistered } ps.peers[p.id] = p + go p.broadcast() + return nil } @@ -334,10 +436,13 @@ func (ps *peerSet) Unregister(id string) error { ps.lock.Lock() defer ps.lock.Unlock() - if _, ok := ps.peers[id]; !ok { + p, ok := ps.peers[id] + if !ok { return errNotRegistered } delete(ps.peers, id) + p.close() + return nil }