Correct bugs in new sync threading

This commit is contained in:
Age Manning 2019-09-07 09:31:05 +10:00
parent 812e1fbe26
commit 04b47a357b
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
5 changed files with 17 additions and 27 deletions

View File

@ -9,7 +9,7 @@ use eth2_libp2p::{
}; };
use futures::future::Future; use futures::future::Future;
use futures::stream::Stream; use futures::stream::Stream;
use slog::{debug, trace, warn}; use slog::{debug, o, trace, warn};
use ssz::{Decode, DecodeError}; use ssz::{Decode, DecodeError};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -51,7 +51,8 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
executor: &tokio::runtime::TaskExecutor, executor: &tokio::runtime::TaskExecutor,
log: slog::Logger, log: slog::Logger,
) -> error::Result<mpsc::UnboundedSender<HandlerMessage>> { ) -> error::Result<mpsc::UnboundedSender<HandlerMessage>> {
trace!(log, "Service starting"); let message_handler_log = log.new(o!("Service"=> "Message Handler"));
trace!(message_handler_log, "Service starting");
let (handler_send, handler_recv) = mpsc::unbounded_channel(); let (handler_send, handler_recv) = mpsc::unbounded_channel();
@ -63,7 +64,7 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
let mut handler = MessageHandler { let mut handler = MessageHandler {
network_send, network_send,
message_processor, message_processor,
log: log.clone(), log: message_handler_log,
}; };
// spawn handler task and move the message handler instance into the spawned thread // spawn handler task and move the message handler instance into the spawned thread

View File

@ -34,13 +34,8 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
// build the network channel // build the network channel
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>(); let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
// launch message handler thread // launch message handler thread
let message_handler_log = log.new(o!("Service" => "MessageHandler")); let message_handler_send =
let message_handler_send = MessageHandler::spawn( MessageHandler::spawn(beacon_chain, network_send.clone(), executor, log.clone())?;
beacon_chain,
network_send.clone(),
executor,
message_handler_log,
)?;
let network_log = log.new(o!("Service" => "Network")); let network_log = log.new(o!("Service" => "Network"));
// launch libp2p service // launch libp2p service

View File

@ -251,7 +251,7 @@ pub fn spawn<T: BeaconChainTypes>(
// create an instance of the SyncManager // create an instance of the SyncManager
let sync_manager = SyncManager { let sync_manager = SyncManager {
chain: beacon_chain, chain: beacon_chain,
state: ManagerState::Regular, state: ManagerState::Stalled,
input_channel: sync_recv, input_channel: sync_recv,
network, network,
import_queue: HashMap::new(), import_queue: HashMap::new(),
@ -510,7 +510,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request_id: RequestId, request_id: RequestId,
blocks: Vec<BeaconBlock<T::EthSpec>>, mut blocks: Vec<BeaconBlock<T::EthSpec>>,
) { ) {
// find the request // find the request
let parent_request = match self let parent_request = match self
@ -545,6 +545,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
return; return;
} }
// add the block to response
parent_request
.downloaded_blocks
.push(blocks.pop().expect("must exist"));
// queue for processing // queue for processing
parent_request.state = BlockRequestsState::ReadyToProcess; parent_request.state = BlockRequestsState::ReadyToProcess;
} }
@ -594,7 +599,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
); );
self.full_peers.insert(peer_id); self.full_peers.insert(peer_id);
self.update_state();
} }
/* Processing State Functions */ /* Processing State Functions */
@ -1077,7 +1081,6 @@ impl<T: BeaconChainTypes> Future for SyncManager<T> {
Ok(Async::Ready(Some(message))) => match message { Ok(Async::Ready(Some(message))) => match message {
SyncMessage::AddPeer(peer_id, info) => { SyncMessage::AddPeer(peer_id, info) => {
self.add_peer(peer_id, info); self.add_peer(peer_id, info);
dbg!("add peer");
} }
SyncMessage::BeaconBlocksResponse { SyncMessage::BeaconBlocksResponse {
peer_id, peer_id,
@ -1118,17 +1121,13 @@ impl<T: BeaconChainTypes> Future for SyncManager<T> {
//need to be called. //need to be called.
let mut re_run = false; let mut re_run = false;
dbg!(self.import_queue.len());
// only process batch requests if there are any // only process batch requests if there are any
if !self.import_queue.is_empty() { if !self.import_queue.is_empty() {
// process potential block requests // process potential block requests
self.process_potential_block_requests(); self.process_potential_block_requests();
dbg!(self.import_queue.len());
// process any complete long-range batches // process any complete long-range batches
re_run = re_run || self.process_complete_batches(); re_run = re_run || self.process_complete_batches();
dbg!(self.import_queue.len());
dbg!(&self.state);
} }
// only process parent objects if we are in regular sync // only process parent objects if we are in regular sync
@ -1140,9 +1139,6 @@ impl<T: BeaconChainTypes> Future for SyncManager<T> {
re_run = re_run || self.process_complete_parent_requests(); re_run = re_run || self.process_complete_parent_requests();
} }
dbg!(self.import_queue.len());
dbg!(&self.state);
// Shutdown the thread if the chain has termined // Shutdown the thread if the chain has termined
if let None = self.chain.upgrade() { if let None = self.chain.upgrade() {
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
@ -1152,8 +1148,6 @@ impl<T: BeaconChainTypes> Future for SyncManager<T> {
break; break;
} }
} }
dbg!(self.import_queue.len());
dbg!(&self.state);
// update the state of the manager // update the state of the manager
self.update_state(); self.update_state();

View File

@ -352,7 +352,7 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
"count" => beacon_blocks.len(), "count" => beacon_blocks.len(),
); );
self.send_to_sync(SyncMessage::RecentBeaconBlocksResponse { self.send_to_sync(SyncMessage::BeaconBlocksResponse {
peer_id, peer_id,
request_id, request_id,
beacon_blocks, beacon_blocks,
@ -368,12 +368,12 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
) { ) {
debug!( debug!(
self.log, self.log,
"BeaconBlocksResponse"; "RecentBeaconBlocksResponse";
"peer" => format!("{:?}", peer_id), "peer" => format!("{:?}", peer_id),
"count" => beacon_blocks.len(), "count" => beacon_blocks.len(),
); );
self.send_to_sync(SyncMessage::BeaconBlocksResponse { self.send_to_sync(SyncMessage::RecentBeaconBlocksResponse {
peer_id, peer_id,
request_id, request_id,
beacon_blocks, beacon_blocks,

@ -1 +1 @@
Subproject commit aaa1673f508103e11304833e0456e4149f880065 Subproject commit ae6dd9011df05fab8c7e651c09cf9c940973bf81