diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs index 7060e8224..e09fbbb07 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs @@ -318,24 +318,41 @@ impl Stream for PeerManager { fn poll(&mut self) -> Poll, Self::Error> { // poll the timeouts for pings and status' + // TODO: Remove task notifies and temporary vecs for stable futures + // These exist to handle a bug in delayqueue + let mut peers_to_add = Vec::new(); while let Async::Ready(Some(peer_id)) = self.ping_peers.poll().map_err(|e| { error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e)); })? { debug!(self.log, "Pinging peer"; "peer_id" => format!("{}", peer_id)); // add the ping timer back - self.ping_peers.insert(peer_id.clone()); + peers_to_add.push(peer_id.clone()); self.events.push(PeerManagerEvent::Ping(peer_id)); } + if !peers_to_add.is_empty() { + futures::task::current().notify(); + } + while let Some(peer) = peers_to_add.pop() { + self.ping_peers.insert(peer); + } + while let Async::Ready(Some(peer_id)) = self.status_peers.poll().map_err(|e| { error!(self.log, "Failed to check for peers to status"; "error" => format!("{}",e)); })? { debug!(self.log, "Sending Status to peer"; "peer_id" => format!("{}", peer_id)); // add the status timer back - self.status_peers.insert(peer_id.clone()); + peers_to_add.push(peer_id.clone()); self.events.push(PeerManagerEvent::Status(peer_id)); } + if !peers_to_add.is_empty() { + futures::task::current().notify(); + } + while let Some(peer) = peers_to_add.pop() { + self.status_peers.insert(peer); + } + if !self.events.is_empty() { return Ok(Async::Ready(Some(self.events.remove(0)))); } else {