diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index dbc32c5a4..a69cd0cda 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -1,4 +1,4 @@ -use super::methods::{RPCErrorResponse, RPCResponse, RequestId}; +use super::methods::RequestId; use super::protocol::{RPCError, RPCProtocol, RPCRequest}; use super::RPCEvent; use crate::rpc::protocol::{InboundFramed, OutboundFramed}; @@ -13,8 +13,8 @@ use smallvec::SmallVec; use std::time::{Duration, Instant}; use tokio_io::{AsyncRead, AsyncWrite}; -/// The time (in seconds) before a substream that is awaiting a response times out. -pub const RESPONSE_TIMEOUT: u64 = 9; +/// The time (in seconds) before a substream that is awaiting a response from the user times out. +pub const RESPONSE_TIMEOUT: u64 = 10; /// Implementation of `ProtocolsHandler` for the RPC protocol. pub struct RPCHandler @@ -314,7 +314,7 @@ where Ok(Async::Ready(response)) => { if let Some(response) = response { return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - build_response(rpc_event, response), + RPCEvent::Response(rpc_event.id(), response), ))); } else { // stream closed early @@ -365,31 +365,3 @@ where Ok(Async::NotReady) } } - -/// Given a response back from a peer and the request that sent it, construct a response to send -/// back to the user. This allows for some data manipulation of responses given requests. -fn build_response(rpc_event: RPCEvent, rpc_response: RPCErrorResponse) -> RPCEvent { - let id = rpc_event.id(); - - // handle the types of responses - match rpc_response { - RPCErrorResponse::Success(response) => { - match response { - // if the response is block roots, tag on the extra request data - RPCResponse::BeaconBlockBodies(mut resp) => { - if let RPCEvent::Request(_id, RPCRequest::BeaconBlockBodies(bodies_req)) = - rpc_event - { - resp.block_roots = Some(bodies_req.block_roots); - } - RPCEvent::Response( - id, - RPCErrorResponse::Success(RPCResponse::BeaconBlockBodies(resp)), - ) - } - _ => RPCEvent::Response(id, RPCErrorResponse::Success(response)), - } - } - _ => RPCEvent::Response(id, rpc_response), - } -} diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index b86dcb969..6a9a40369 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -14,9 +14,7 @@ use slog::{debug, trace, warn}; use ssz::{Decode, DecodeError}; use std::sync::Arc; use tokio::sync::mpsc; -use types::{ - Attestation, AttesterSlashing, BeaconBlock, BeaconBlockHeader, ProposerSlashing, VoluntaryExit, -}; +use types::{Attestation, AttesterSlashing, BeaconBlock, ProposerSlashing, VoluntaryExit}; /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { @@ -56,9 +54,9 @@ impl MessageHandler { let (handler_send, handler_recv) = mpsc::unbounded_channel(); // Initialise sync and begin processing in thread - // generate the Message handler let sync = SimpleSync::new(beacon_chain.clone(), &log); + // generate the Message handler let mut handler = MessageHandler { _chain: beacon_chain.clone(), sync, @@ -66,7 +64,7 @@ impl MessageHandler { log: log.clone(), }; - // spawn handler task + // spawn handler task and move the message handler instance into the spawned thread executor.spawn( handler_recv .for_each(move |msg| Ok(handler.handle_message(msg))) @@ -89,11 +87,11 @@ impl MessageHandler { HandlerMessage::PeerDisconnected(peer_id) => { self.sync.on_disconnect(peer_id); } - // we have received an RPC message request/response + // An RPC message request/response has been received HandlerMessage::RPC(peer_id, rpc_event) => { self.handle_rpc_message(peer_id, rpc_event); } - // we have received an RPC message request/response + // An RPC message request/response has been received HandlerMessage::PubsubMessage(peer_id, gossip) => { self.handle_gossip(peer_id, gossip); } @@ -106,7 +104,7 @@ impl MessageHandler { fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) { match rpc_message { RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req), - RPCEvent::Response(_id, resp) => self.handle_rpc_response(peer_id, resp), + RPCEvent::Response(id, resp) => self.handle_rpc_response(peer_id, id, resp), RPCEvent::Error(id, error) => self.handle_rpc_error(peer_id, id, error), } } @@ -121,46 +119,39 @@ impl MessageHandler { &mut self.network_context, ), RPCRequest::Goodbye(goodbye_reason) => self.sync.on_goodbye(peer_id, goodbye_reason), - RPCRequest::BeaconBlockRoots(request) => self.sync.on_beacon_block_roots_request( + RPCRequest::BeaconBlocks(request) => self.sync.on_beacon_blocks_request( peer_id, request_id, request, &mut self.network_context, ), - RPCRequest::BeaconBlockHeaders(request) => self.sync.on_beacon_block_headers_request( + RPCRequest::RecentBeaconBlocks(request) => self.sync.on_recent_beacon_blocks_request( peer_id, request_id, request, &mut self.network_context, ), - RPCRequest::BeaconBlockBodies(request) => self.sync.on_beacon_block_bodies_request( - peer_id, - request_id, - request, - &mut self.network_context, - ), - RPCRequest::BeaconChainState(_) => { - // We do not implement this endpoint, it is not required and will only likely be - // useful for light-client support in later phases. - warn!(self.log, "BeaconChainState RPC call is not supported."); - } } } /// An RPC response has been received from the network. // we match on id and ignore responses past the timeout. - fn handle_rpc_response(&mut self, peer_id: PeerId, error_response: RPCErrorResponse) { + fn handle_rpc_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + error_response: RPCErrorResponse, + ) { // an error could have occurred. - // TODO: Handle Error gracefully match error_response { RPCErrorResponse::InvalidRequest(error) => { - warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Invalid Request" => error.as_string()) + warn!(self.log, "Peer indicated invalid request";"peer_id" => format!("{:?}", peer_id), "error" => error.as_string()) } RPCErrorResponse::ServerError(error) => { - warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Server Error" => error.as_string()) + warn!(self.log, "Peer internal server error";"peer_id" => format!("{:?}", peer_id), "error" => error.as_string()) } RPCErrorResponse::Unknown(error) => { - warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Unknown Error" => error.as_string()) + warn!(self.log, "Unknown peer error";"peer" => format!("{:?}", peer_id), "error" => error.as_string()) } RPCErrorResponse::Success(response) => { match response { @@ -171,49 +162,37 @@ impl MessageHandler { &mut self.network_context, ); } - RPCResponse::BeaconBlockRoots(response) => { - self.sync.on_beacon_block_roots_response( - peer_id, - response, - &mut self.network_context, - ); - } - RPCResponse::BeaconBlockHeaders(response) => { - match self.decode_block_headers(response) { - Ok(decoded_block_headers) => { - self.sync.on_beacon_block_headers_response( + RPCResponse::BeaconBlocks(response) => { + match self.decode_beacon_blocks(response) { + Ok(beacon_blocks) => { + self.sync.on_beacon_blocks_response( peer_id, - decoded_block_headers, + beacon_blocks, &mut self.network_context, ); } - Err(_e) => { - warn!(self.log, "Peer sent invalid block headers";"peer" => format!("{:?}", peer_id)) + Err(e) => { + // TODO: Down-vote Peer + warn!(self.log, "Peer sent invalid BEACON_BLOCKS response";"peer" => format!("{:?}", peer_id), "error" => format!("{:?}", e)); } } } - RPCResponse::BeaconBlockBodies(response) => { - match self.decode_block_bodies(response) { - Ok(decoded_block_bodies) => { - self.sync.on_beacon_block_bodies_response( + RPCResponse::RecentBeaconBlocks(response) => { + match self.decode_beacon_blocks(response) { + Ok(beacon_blocks) => { + self.sync.on_recent_beacon_blocks_response( + request_id, peer_id, - decoded_block_bodies, + beacon_blocks, &mut self.network_context, ); } - Err(_e) => { - warn!(self.log, "Peer sent invalid block bodies";"peer" => format!("{:?}", peer_id)) + Err(e) => { + // TODO: Down-vote Peer + warn!(self.log, "Peer sent invalid BEACON_BLOCKS response";"peer" => format!("{:?}", peer_id), "error" => format!("{:?}", e)); } } } - RPCResponse::BeaconChainState(_) => { - // We do not implement this endpoint, it is not required and will only likely be - // useful for light-client support in later phases. - // - // Theoretically, we shouldn't reach this code because we should never send a - // beacon state RPC request. - warn!(self.log, "BeaconChainState RPC call is not supported."); - } } } } @@ -334,36 +313,22 @@ impl MessageHandler { /* Req/Resp Domain Decoding */ - /// Verifies and decodes the ssz-encoded block bodies received from peers. - fn decode_block_bodies( + /// Verifies and decodes an ssz-encoded list of `BeaconBlock`s. This list may contain empty + /// entries encoded with an SSZ NULL. + fn decode_beacon_blocks( &self, - bodies_response: BeaconBlockBodiesResponse, - ) -> Result, DecodeError> { + beacon_blocks: &[u8], + ) -> Result>, DecodeError> { //TODO: Implement faster block verification before decoding entirely - let block_bodies = Vec::from_ssz_bytes(&bodies_response.block_bodies)?; - Ok(DecodedBeaconBlockBodiesResponse { - block_roots: bodies_response - .block_roots - .expect("Responses must have associated roots"), - block_bodies, - }) - } - - /// Verifies and decodes the ssz-encoded block headers received from peers. - fn decode_block_headers( - &self, - headers_response: BeaconBlockHeadersResponse, - ) -> Result, DecodeError> { - //TODO: Implement faster header verification before decoding entirely - Vec::from_ssz_bytes(&headers_response.headers) + Vec::from_ssz_bytes(&beacon_blocks) } } -// TODO: RPC Rewrite makes this struct fairly pointless +/// Wraps a Network Channel to employ various RPC/Sync related network functionality. pub struct NetworkContext { /// The network channel to relay messages to the Network service. network_send: mpsc::UnboundedSender, - /// The `MessageHandler` logger. + /// Logger for the `NetworkContext`. log: slog::Logger, } @@ -388,7 +353,7 @@ impl NetworkContext { &mut self, peer_id: PeerId, request_id: RequestId, - rpc_response: RPCResponse, + rpc_response: RPCErrorResponse, ) { self.send_rpc_event( peer_id, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs new file mode 100644 index 000000000..52c1a72c6 --- /dev/null +++ b/beacon_node/network/src/sync/manager.rs @@ -0,0 +1,283 @@ + +const MAXIMUM_BLOCKS_PER_REQUEST: usize = 10; +const SIMULTANEOUS_REQUESTS: usize = 10; +use super::simple_sync::FUTURE_SLOT_TOLERANCE; + +struct Chunk { + id: usize, + start_slot: Slot, + end_slot: Slot, + } + + +struct CompletedChunk { + peer_id: PeerId, + chunk: Chunk, + blocks: Vec, +} + +struct ProcessedChunk { + peer_id: PeerId, + chunk: Chunk, +} + +#[derive(PartialEq)] +pub enum SyncState { + Idle, + Downloading, + ColdSync { + max_wanted_slot: Slot, + max_wanted_hash: Hash256, + } +} + +pub enum SyncManagerState { + RequestBlocks(peer_id, BeaconBlockRequest), + Stalled, + Idle, +} + +pub struct PeerSyncInfo { + peer_id: PeerId, + fork_version: [u8,4], + finalized_root: Hash256, + finalized_epoch: Epoch, + head_root: Hash256, + head_slot: Slot, + requested_slot_skip: Option<(Slot, usize)>, +} + +pub(crate) struct SyncManager { + /// A reference to the underlying beacon chain. + chain: Arc>, + /// A mapping of Peers to their respective PeerSyncInfo. + available_peers: HashMap, + wanted_chunks: Vec, + pending_chunks: HashMap, + completed_chunks: Vec, + processed_chunks: Vec, // ordered + multi_peer_sections: HashMap + + current_requests: usize, + latest_wanted_slot: Option, + sync_status: SyncStatus, + to_process_chunk_id: usize, + log: Logger, + +} + +impl SyncManager { + /// Adds a sync-able peer and determines which blocks to download given the current state of + /// the chain, known peers and currently requested blocks. + fn add_sync_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo, network &mut NetworkContext) { + + let local = PeerSyncInfo::from(&self.chain); + let remote_finalized_slot = remote.finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let local_finalized_slot = local.finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()); + + // cold sync + if remote_finalized_slot > local.head_slot { + if let SyncState::Idle || SyncState::Downloading = self.sync_state { + info!(self.log, "Cold Sync Started", "start_slot" => local.head_slot, "latest_known_finalized" => remote_finalized_slot); + self.sync_state = SyncState::ColdSync{Slot::from(0), remote.finalized_hash} + } + + if let SyncState::ColdSync{max_wanted_slot, max_wanted_hjash } = self.sync_state { + + // We don't assume that our current head is the canonical chain. So we request blocks from + // our last finalized slot to ensure we are on the finalized chain. + if max_wanted_slot < remote_finalized_slot { + let remaining_blocks = remote_finalized_slot - max_wanted_slot; + for chunk in (0..remaining_blocks/MAXIMUM_BLOCKS_PER_REQUEST) { + self.wanted_chunks.push( + Chunk { + id: self.current_chunk_id, + previous_chunk: self.curent_chunk_id.saturating_sub(1), + start_slot: chunk*MAXIMUM_BLOCKS_PER_REQUEST + self.last_wanted_slot, + end_slot: (section+1)*MAXIMUM_BLOCKS_PER_REQUEST +self.last_wanted_slot, + }) + self.current_chunk_id +=1; + } + + // add any extra partial chunks + self.pending_section.push( Section { + start_slot: (remaining_blocks/MAXIMUM_BLOCKS_PER_REQUEST) + 1, + end_slot: remote_finalized_slot, + }) + self.current_chunk_id +=1; + + info!(self.log, "Cold Sync Updated", "start_slot" => local.head_slot, "latest_known_finalized" => remote_finalized_slot); + + self.sync_state = SyncState::ColdSync{remote_finalized_slot, remote.finalized_hash} + } + } + + else { // hot sync + if remote_head_slot > self.chain.head().beacon_state.slot { + if let SyncState::Idle = self.sync_state { + self.sync_state = SyncState::Downloading + info!(self.log, "Sync Started", "start_slot" => local.head_slot, "latest_known_head" => remote.head_slot.as_u64()); + } + self.latest_known_slot = remote_head_slot; + //TODO Build requests. + } + } + + available_peers.push(remote); + + } + + pub fn add_blocks(&mut self, chunk_id: RequestId, peer_id: PeerId, blocks: Vec) { + + if SyncState::ColdSync{max_wanted_slot, max_wanted_hash} = self.sync_state { + + let chunk = match self.pending_chunks.remove(&peer_id) { + Some(chunks) => { + match chunks.find(|chunk| chunk.id == chunk_id) { + Some(chunk) => chunk, + None => { + warn!(self.log, "Received blocks for an unknown chunk"; + "peer"=> peer_id); + return; + } + } + }, + None => { + warn!(self.log, "Received blocks without a request"; + "peer"=> peer_id); + return; + } + }; + + // add to completed + self.current_requests -= 1; + self.completed_chunks.push(CompletedChunk(peer_id, Chunk)); + } + } + + pub fn inject_error(id: RequestId, peer_id) { + if let SyncState::ColdSync{ _max_wanted_slot, _max_wanted_hash } { + match self.pending_chunks.get(&peer_id) { + Some(chunks) => { + if let Some(pos) = chunks.iter().position(|c| c.id == id) { + chunks.remove(pos); + } + }, + None => { + debug!(self.log, + "Received an error for an unknown request"; + "request_id" => id, + "peer" => peer_id + ); + } + } + } + } + + pub fn poll(&mut self) -> SyncManagerState { + + // if cold sync + if let SyncState::ColdSync(waiting_slot, max_wanted_slot, max_wanted_hash) = self.sync_state { + + // Try to process completed chunks + for completed_chunk in self.completed_chunks { + let chunk = completed_chunk.1; + let last_chunk_id = { + let no_processed_chunks = self.processed_chunks.len(); + if elements == 0 { 0 } else { self.processed_chunks[no_processed_chunks].id } + }; + if chunk.id == last_chunk_id + 1 { + // try and process the chunk + for block in chunk.blocks { + let processing_result = self.chain.process_block(block.clone()); + + if let Ok(outcome) = processing_result { + match outcome { + BlockProcessingOutCome::Processed { block_root} => { + // block successfully processed + }, + BlockProcessingOutcome::BlockIsAlreadyKnown => { + warn!( + self.log, "Block Already Known"; + "source" => source, + "sync" => "Cold Sync", + "parent_root" => format!("{}", parent), + "baby_block_slot" => block.slot, + "peer" => format!("{:?}", chunk.0), + ); + }, + _ => { + // An error has occurred + // This could be due to the previous chunk or the current chunk. + // Re-issue both. + warn!( + self.log, "Faulty Chunk"; + "source" => source, + "sync" => "Cold Sync", + "parent_root" => format!("{}", parent), + "baby_block_slot" => block.slot, + "peer" => format!("{:?}", chunk.0), + "outcome" => format!("{:?}", outcome), + ); + + // re-issue both chunks + // if both are the same peer. Downgrade the peer. + let past_chunk = self.processed_chunks.pop() + self.wanted_chunks.insert(0, chunk.clone()); + self.wanted_chunks.insert(0, past_chunk.clone()); + if chunk.0 == past_chunk.peer_id { + // downgrade peer + return SyncManagerState::DowngradePeer(chunk.0); + } + break; + } + } + } + } + // chunk successfully processed + debug!(self.log, + "Chunk Processed"; + "id" => chunk.id + "start_slot" => chunk.start_slot, + "end_slot" => chunk.end_slot, + ); + self.processed_chunks.push(chunk); + } + } + + // chunks completed, update the state + self.sync_state = SyncState::ColdSync{waiting_slot, max_wanted_slot, max_wanted_hash}; + + // Remove stales + + // Spawn requests + if self.current_requests <= SIMULTANEOUS_REQUESTS { + if !self.wanted_chunks.is_empty() { + let chunk = self.wanted_chunks.remove(0); + for n in (0..self.peers.len()).rev() { + let peer = self.peers.swap_remove(n); + let peer_finalized_slot = peer.finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()); + if peer_finalized_slot >= chunk.end_slot { + *self.pending.chunks.entry(&peer_id).or_insert_with(|| Vec::new).push(chunk); + self.active_peers.push(peer); + self.current_requests +=1; + let block_request = BeaconBlockRequest { + head_block_root, + start_slot: chunk.start_slot, + count: chunk.end_slot - chunk.start_slot + step: 1 + } + return SyncManagerState::BlockRequest(peer, block_request); + } + } + // no peers for this chunk + self.wanted_chunks.push(chunk); + return SyncManagerState::Stalled + } + } + } + + // if hot sync + return SyncManagerState::Idle + + } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index c3271888a..e3d3d7cef 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -17,7 +17,7 @@ use types::{ /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. const SLOT_IMPORT_TOLERANCE: u64 = 100; -/// The amount of seconds a block (or partial block) may exist in the import queue. +/// The amount of seconds a block may exist in the import queue. const QUEUE_STALE_SECS: u64 = 100; /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. @@ -30,23 +30,23 @@ const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false; /// Keeps track of syncing information for known connected peers. #[derive(Clone, Copy, Debug)] pub struct PeerSyncInfo { - network_id: u8, - chain_id: u64, - latest_finalized_root: Hash256, - latest_finalized_epoch: Epoch, - best_root: Hash256, - best_slot: Slot, + fork_version: [u8,4], + finalized_root: Hash256, + finalized_epoch: Epoch, + head_root: Hash256, + head_slot: Slot, + requested_slot_skip: Option<(Slot, usize)>, } impl From for PeerSyncInfo { fn from(hello: HelloMessage) -> PeerSyncInfo { PeerSyncInfo { - network_id: hello.network_id, - chain_id: hello.chain_id, - latest_finalized_root: hello.latest_finalized_root, - latest_finalized_epoch: hello.latest_finalized_epoch, - best_root: hello.best_root, - best_slot: hello.best_slot, + fork_version: hello.fork_version, + finalized_root: hello.finalized_root, + finalized_epoch: hello.finalized_epoch, + head_root: hello.head_root, + head_slot: hello.head_slot, + requested_slot_skip: None, } } } @@ -71,8 +71,6 @@ pub struct SimpleSync { chain: Arc>, /// A mapping of Peers to their respective PeerSyncInfo. known_peers: HashMap, - /// A queue to allow importing of blocks - import_queue: ImportQueue, /// The current state of the syncing protocol. state: SyncState, log: slog::Logger, @@ -178,8 +176,8 @@ impl SimpleSync { let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); - if local.network_id != remote.network_id { - // The node is on a different network, disconnect them. + if local.fork_version != remote.fork_version { + // The node is on a different network/fork, disconnect them. info!( self.log, "HandshakeFailure"; "peer" => format!("{:?}", peer_id), @@ -187,9 +185,9 @@ impl SimpleSync { ); network.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); - } else if remote.latest_finalized_epoch <= local.latest_finalized_epoch - && remote.latest_finalized_root != Hash256::zero() - && local.latest_finalized_root != Hash256::zero() + } else if remote.finalized_epoch <= local.finalized_epoch + && remote.finalized_root != Hash256::zero() + && local.finalized_root != Hash256::zero() && (self.root_at_slot(start_slot(remote.latest_finalized_epoch)) != Some(remote.latest_finalized_root)) { @@ -248,22 +246,37 @@ impl SimpleSync { "remote_latest_finalized_epoch" => remote.latest_finalized_epoch, ); - let start_slot = local - .latest_finalized_epoch - .start_slot(T::EthSpec::slots_per_epoch()); - let required_slots = remote.best_slot - start_slot; - self.request_block_roots( - peer_id, - BeaconBlockRootsRequest { - start_slot, - count: required_slots.as_u64(), - }, - network, - ); + self.process_sync(); } } + self.proess_sync(&mut self) { + loop { + match self.sync_manager.poll() { + SyncManagerState::RequestBlocks(peer_id, req) { + debug!( + self.log, + "RPCRequest(BeaconBlockBodies)"; + "count" => req.block_roots.len(), + "peer" => format!("{:?}", peer_id) + ); + network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlocks(req)); + }, + SyncManagerState::Stalled { + // need more peers to continue sync + warn!(self.log, "No useable peers for sync"); + break; + }, + SyncManagerState::Idle { + // nothing to do + break; + } + } + } + } + + fn root_at_slot(&self, target_slot: Slot) -> Option { self.chain .rev_iter_block_roots(target_slot) @@ -272,213 +285,27 @@ impl SimpleSync { .map(|(root, _slot)| root) } - /// Handle a `BeaconBlockRoots` request from the peer. - pub fn on_beacon_block_roots_request( + /// Handle a `BeaconBlocks` request from the peer. + pub fn on_beacon_blocks_request( &mut self, peer_id: PeerId, request_id: RequestId, - req: BeaconBlockRootsRequest, + req: BeaconBlocksRequest, network: &mut NetworkContext, ) { let state = &self.chain.head().beacon_state; debug!( self.log, - "BlockRootsRequest"; + "BeaconBlocksRequest"; "peer" => format!("{:?}", peer_id), "count" => req.count, "start_slot" => req.start_slot, ); - let mut roots: Vec = self - .chain - .rev_iter_block_roots(std::cmp::min(req.start_slot + req.count, state.slot)) - .take_while(|(_root, slot)| req.start_slot <= *slot) - .map(|(block_root, slot)| BlockRootSlot { slot, block_root }) - .collect(); - - if roots.len() as u64 != req.count { - debug!( - self.log, - "BlockRootsRequest"; - "peer" => format!("{:?}", peer_id), - "msg" => "Failed to return all requested hashes", - "start_slot" => req.start_slot, - "current_slot" => self.chain.present_slot(), - "requested" => req.count, - "returned" => roots.len(), - ); - } - - roots.reverse(); - roots.dedup_by_key(|brs| brs.block_root); - - network.send_rpc_response( - peer_id, - request_id, - RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse { roots }), - ) - } - - /// Handle a `BeaconBlockRoots` response from the peer. - pub fn on_beacon_block_roots_response( - &mut self, - peer_id: PeerId, - res: BeaconBlockRootsResponse, - network: &mut NetworkContext, - ) { - debug!( - self.log, - "BlockRootsResponse"; - "peer" => format!("{:?}", peer_id), - "count" => res.roots.len(), - ); - - if res.roots.is_empty() { - warn!( - self.log, - "Peer returned empty block roots response"; - "peer_id" => format!("{:?}", peer_id) - ); - return; - } - - // The wire protocol specifies that slots must be in ascending order. - if !res.slots_are_ascending() { - warn!( - self.log, - "Peer returned block roots response with bad slot ordering"; - "peer_id" => format!("{:?}", peer_id) - ); - return; - } - - let new_roots = self - .import_queue - .enqueue_block_roots(&res.roots, peer_id.clone()); - - // No new roots means nothing to do. - // - // This check protects against future panics. - if new_roots.is_empty() { - return; - } - - // Determine the first (earliest) and last (latest) `BlockRootSlot` items. - // - // This logic relies upon slots to be in ascending order, which is enforced earlier. - let first = new_roots.first().expect("Non-empty list must have first"); - let last = new_roots.last().expect("Non-empty list must have last"); - - // Request all headers between the earliest and latest new `BlockRootSlot` items. - self.request_block_headers( - peer_id, - BeaconBlockHeadersRequest { - start_root: first.block_root, - start_slot: first.slot, - max_headers: (last.slot - first.slot + 1).as_u64(), - skip_slots: 0, - }, - network, - ) - } - - /// Handle a `BeaconBlockHeaders` request from the peer. - pub fn on_beacon_block_headers_request( - &mut self, - peer_id: PeerId, - request_id: RequestId, - req: BeaconBlockHeadersRequest, - network: &mut NetworkContext, - ) { - let state = &self.chain.head().beacon_state; - - debug!( - self.log, - "BlockHeadersRequest"; - "peer" => format!("{:?}", peer_id), - "count" => req.max_headers, - ); - - let count = req.max_headers; - - // Collect the block roots. - let mut roots: Vec = self - .chain - .rev_iter_block_roots(std::cmp::min(req.start_slot + count, state.slot)) - .take_while(|(_root, slot)| req.start_slot <= *slot) - .map(|(root, _slot)| root) - .collect(); - - roots.reverse(); - roots.dedup(); - - let headers: Vec = roots - .into_iter() - .step_by(req.skip_slots as usize + 1) - .filter_map(|root| { - let block = self - .chain - .store - .get::>(&root) - .ok()?; - Some(block?.block_header()) - }) - .collect(); - - // ssz-encode the headers - let headers = headers.as_ssz_bytes(); - - network.send_rpc_response( - peer_id, - request_id, - RPCResponse::BeaconBlockHeaders(BeaconBlockHeadersResponse { headers }), - ) - } - - /// Handle a `BeaconBlockHeaders` response from the peer. - pub fn on_beacon_block_headers_response( - &mut self, - peer_id: PeerId, - headers: Vec, - network: &mut NetworkContext, - ) { - debug!( - self.log, - "BlockHeadersResponse"; - "peer" => format!("{:?}", peer_id), - "count" => headers.len(), - ); - - if headers.is_empty() { - warn!( - self.log, - "Peer returned empty block headers response. PeerId: {:?}", peer_id - ); - return; - } - - // Enqueue the headers, obtaining a list of the roots of the headers which were newly added - // to the queue. - let block_roots = self.import_queue.enqueue_headers(headers, peer_id.clone()); - - if !block_roots.is_empty() { - self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); - } - } - - /// Handle a `BeaconBlockBodies` request from the peer. - pub fn on_beacon_block_bodies_request( - &mut self, - peer_id: PeerId, - request_id: RequestId, - req: BeaconBlockBodiesRequest, - network: &mut NetworkContext, - ) { - let block_bodies: Vec> = req - .block_roots - .iter() - .filter_map(|root| { + let blocks = Vec> = self + .chain.rev_iter_block_roots().filter(|(_root, slot) req.start_slot <= slot && req.start_slot + req.count >= slot).take_while(|(_root, slot) req.start_slot <= *slot) + .filter_map(|root, slot| { if let Ok(Some(block)) = self.chain.store.get::>(root) { Some(block.body) } else { @@ -494,59 +321,49 @@ impl SimpleSync { }) .collect(); - debug!( - self.log, - "BlockBodiesRequest"; - "peer" => format!("{:?}", peer_id), - "requested" => req.block_roots.len(), - "returned" => block_bodies.len(), - ); + roots.reverse(); + roots.dedup_by_key(|brs| brs.block_root); - let bytes = block_bodies.as_ssz_bytes(); + if roots.len() as u64 != req.count { + debug!( + self.log, + "BeaconBlocksRequest"; + "peer" => format!("{:?}", peer_id), + "msg" => "Failed to return all requested hashes", + "start_slot" => req.start_slot, + "current_slot" => self.chain.present_slot(), + "requested" => req.count, + "returned" => roots.len(), + ); + } network.send_rpc_response( peer_id, request_id, - RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { - block_bodies: bytes, - block_roots: None, - }), + RPCResponse::BeaconBlocks(blocks.as_ssz_bytes()), ) } - /// Handle a `BeaconBlockBodies` response from the peer. - pub fn on_beacon_block_bodies_response( + + /// Handle a `BeaconBlocks` response from the peer. + pub fn on_beacon_blocks_response( &mut self, peer_id: PeerId, - res: DecodedBeaconBlockBodiesResponse, + res: Vec>, network: &mut NetworkContext, ) { debug!( self.log, - "BlockBodiesResponse"; + "BeaconBlocksResponse"; "peer" => format!("{:?}", peer_id), "count" => res.block_bodies.len(), ); - if !res.block_bodies.is_empty() { - // Import all blocks to queue - let last_root = self - .import_queue - .enqueue_bodies(res.block_bodies, peer_id.clone()); - - // Attempt to process all received bodies by recursively processing the latest block - if let Some(root) = last_root { - if let Some(BlockProcessingOutcome::Processed { .. }) = - self.attempt_process_partial_block(peer_id, root, network, &"rpc") - { - // If processing is successful remove from `import_queue` - self.import_queue.remove(root); - } - } + if !res.is_empty() { + self.sync_manager.add_blocks(peer_id, blocks); } - // Clear out old entries - self.import_queue.remove_stale(); + self.process_sync(); } /// Process a gossip message declaring a new block. @@ -679,22 +496,6 @@ impl SimpleSync { network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockHeaders(req)); } - /// Request some `BeaconBlockBodies` from the remote peer. - fn request_block_bodies( - &mut self, - peer_id: PeerId, - req: BeaconBlockBodiesRequest, - network: &mut NetworkContext, - ) { - debug!( - self.log, - "RPCRequest(BeaconBlockBodies)"; - "count" => req.block_roots.len(), - "peer" => format!("{:?}", peer_id) - ); - - network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req)); - } /// Returns `true` if `self.chain` has not yet processed this block. pub fn chain_has_seen_block(&self, block_root: &Hash256) -> bool {