Correctly notify delay queues (#1087)
This commit is contained in:
parent
f4ac0422e2
commit
e0723dfc3b
@ -318,24 +318,41 @@ impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
|
|||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
// poll the timeouts for pings and status'
|
// poll the timeouts for pings and status'
|
||||||
|
// TODO: Remove task notifies and temporary vecs for stable futures
|
||||||
|
// These exist to handle a bug in delayqueue
|
||||||
|
let mut peers_to_add = Vec::new();
|
||||||
while let Async::Ready(Some(peer_id)) = self.ping_peers.poll().map_err(|e| {
|
while let Async::Ready(Some(peer_id)) = self.ping_peers.poll().map_err(|e| {
|
||||||
error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e));
|
error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e));
|
||||||
})? {
|
})? {
|
||||||
debug!(self.log, "Pinging peer"; "peer_id" => format!("{}", peer_id));
|
debug!(self.log, "Pinging peer"; "peer_id" => format!("{}", peer_id));
|
||||||
// add the ping timer back
|
// add the ping timer back
|
||||||
self.ping_peers.insert(peer_id.clone());
|
peers_to_add.push(peer_id.clone());
|
||||||
self.events.push(PeerManagerEvent::Ping(peer_id));
|
self.events.push(PeerManagerEvent::Ping(peer_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !peers_to_add.is_empty() {
|
||||||
|
futures::task::current().notify();
|
||||||
|
}
|
||||||
|
while let Some(peer) = peers_to_add.pop() {
|
||||||
|
self.ping_peers.insert(peer);
|
||||||
|
}
|
||||||
|
|
||||||
while let Async::Ready(Some(peer_id)) = self.status_peers.poll().map_err(|e| {
|
while let Async::Ready(Some(peer_id)) = self.status_peers.poll().map_err(|e| {
|
||||||
error!(self.log, "Failed to check for peers to status"; "error" => format!("{}",e));
|
error!(self.log, "Failed to check for peers to status"; "error" => format!("{}",e));
|
||||||
})? {
|
})? {
|
||||||
debug!(self.log, "Sending Status to peer"; "peer_id" => format!("{}", peer_id));
|
debug!(self.log, "Sending Status to peer"; "peer_id" => format!("{}", peer_id));
|
||||||
// add the status timer back
|
// add the status timer back
|
||||||
self.status_peers.insert(peer_id.clone());
|
peers_to_add.push(peer_id.clone());
|
||||||
self.events.push(PeerManagerEvent::Status(peer_id));
|
self.events.push(PeerManagerEvent::Status(peer_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !peers_to_add.is_empty() {
|
||||||
|
futures::task::current().notify();
|
||||||
|
}
|
||||||
|
while let Some(peer) = peers_to_add.pop() {
|
||||||
|
self.status_peers.insert(peer);
|
||||||
|
}
|
||||||
|
|
||||||
if !self.events.is_empty() {
|
if !self.events.is_empty() {
|
||||||
return Ok(Async::Ready(Some(self.events.remove(0))));
|
return Ok(Async::Ready(Some(self.events.remove(0))));
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
Reference in New Issue
Block a user