diff --git a/account_manager/src/main.rs b/account_manager/src/main.rs index b7448ddf2..ae3823049 100644 --- a/account_manager/src/main.rs +++ b/account_manager/src/main.rs @@ -125,9 +125,13 @@ fn main() { } } } - _ => panic!( - "The account manager must be run with a subcommand. See help for more information." - ), + _ => { + crit!( + log, + "The account manager must be run with a subcommand. See help for more information." + ); + return; + } } } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0f76507fe..aa9332c01 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -459,6 +459,15 @@ impl BeaconChain { None } + /// Returns the block canonical root of the current canonical chain at a given slot. + /// + /// Returns None if a block doesn't exist at the slot. + pub fn root_at_slot(&self, target_slot: Slot) -> Option { + self.rev_iter_block_roots() + .find(|(_root, slot)| *slot == target_slot) + .map(|(root, _slot)| root) + } + /// Reads the slot clock (see `self.read_slot_clock()` and returns the number of slots since /// genesis. pub fn slots_since_genesis(&self) -> Option { @@ -1017,7 +1026,7 @@ impl BeaconChain { }; // Load the parent blocks state from the database, returning an error if it is not found. - // It is an error because if know the parent block we should also know the parent state. + // It is an error because if we know the parent block we should also know the parent state. let parent_state_root = parent_block.state_root; let parent_state = self .store diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index d705637cb..343918d4d 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -34,10 +34,10 @@ pub fn run(client: &Client, executor: TaskExecutor, exit // Panics if libp2p is poisoned. let connected_peer_count = libp2p.lock().swarm.connected_peers(); - debug!(log, "Libp2p connected peer status"; "peer_count" => connected_peer_count); + debug!(log, "Connected peer status"; "peer_count" => connected_peer_count); if connected_peer_count <= WARN_PEER_COUNT { - warn!(log, "Low libp2p peer count"; "peer_count" => connected_peer_count); + warn!(log, "Low peer count"; "peer_count" => connected_peer_count); } Ok(()) diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index caa5b28e4..59c799105 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -7,8 +7,8 @@ edition = "2018" [dependencies] clap = "2.32.0" #SigP repository -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "61036890d574f5b46573952b20def2baafd6a6e9" } -enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "61036890d574f5b46573952b20def2baafd6a6e9", features = ["serde"] } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "76f7475e4b7063e663ad03c7524cf091f9961968" } +enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "76f7475e4b7063e663ad03c7524cf091f9961968", features = ["serde"] } types = { path = "../../eth2/types" } serde = "1.0" serde_derive = "1.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 2c574e46a..a47d32ec2 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -15,7 +15,7 @@ use libp2p::{ tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; -use slog::{debug, o, trace}; +use slog::{debug, o}; use std::num::NonZeroU32; use std::time::Duration; @@ -90,13 +90,15 @@ impl NetworkBehaviourEventProcess { - trace!(self.log, "Received GossipEvent"); - + GossipsubEvent::Message(propagation_source, gs_msg) => { + let id = gs_msg.id(); let msg = PubsubMessage::from_topics(&gs_msg.topics, gs_msg.data); + // Note: We are keeping track here of the peer that sent us the message, not the + // peer that originally published the message. self.events.push(BehaviourEvent::GossipMessage { - source: gs_msg.source, + id, + source: propagation_source, topics: gs_msg.topics, message: msg, }); @@ -199,6 +201,13 @@ impl Behaviour { } } + /// Forwards a message that is waiting in gossipsub's mcache. Messages are only propagated + /// once validated by the beacon chain. + pub fn propagate_message(&mut self, propagation_source: &PeerId, message_id: String) { + self.gossipsub + .propagate_message(&message_id, propagation_source); + } + /* Eth2 RPC behaviour functions */ /// Sends an RPC Request/Response via the RPC protocol. @@ -214,12 +223,21 @@ impl Behaviour { /// The types of events than can be obtained from polling the behaviour. pub enum BehaviourEvent { + /// A received RPC event and the peer that it was received from. RPC(PeerId, RPCEvent), + /// We have completed an initial connection to a new peer. PeerDialed(PeerId), + /// A peer has disconnected. PeerDisconnected(PeerId), + /// A gossipsub message has been received. GossipMessage { + /// The gossipsub message id. Used when propagating blocks after validation. + id: String, + /// The peer from which we received this message, not the peer that published it. source: PeerId, + /// The topics that this message was sent on. topics: Vec, + /// The message itself. message: PubsubMessage, }, } diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index 7cb501c1f..fd44b99af 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -74,7 +74,8 @@ impl Default for Config { // parameter. gs_config: GossipsubConfigBuilder::new() .max_transmit_size(1_048_576) - .heartbeat_interval(Duration::from_secs(20)) + .heartbeat_interval(Duration::from_secs(20)) // TODO: Reduce for mainnet + .propagate_messages(false) // require validation before propagation .build(), boot_nodes: vec![], libp2p_nodes: vec![], diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index 4a8aba2b1..69ca39ad7 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -114,7 +114,7 @@ impl Discovery { self.find_peers(); } - /// Add an Enr to the routing table of the discovery mechanism. + /// Add an ENR to the routing table of the discovery mechanism. pub fn add_enr(&mut self, enr: Enr) { self.discovery.add_enr(enr); } @@ -169,6 +169,7 @@ where fn inject_connected(&mut self, peer_id: PeerId, _endpoint: ConnectedPoint) { self.connected_peers.insert(peer_id); + // TODO: Drop peers if over max_peer limit metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); metrics::set_gauge(&metrics::PEERS_CONNECTED, self.connected_peers() as i64); diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index 260a00346..1966bab62 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -152,45 +152,49 @@ impl Decoder for SSZOutboundCodec { type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - match self.inner.decode(src).map_err(RPCError::from) { - Ok(Some(packet)) => match self.protocol.message_name.as_str() { + if src.is_empty() { + // the object sent could be empty. We return the empty object if this is the case + match self.protocol.message_name.as_str() { "hello" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes( - &packet, - )?))), + "1" => Err(RPCError::Custom( + "Hello stream terminated unexpectedly".into(), + )), // cannot have an empty HELLO message. The stream has terminated unexpectedly _ => unreachable!("Cannot negotiate an unknown version"), }, "goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), "beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::BeaconBlocks(packet.to_vec()))), + "1" => Ok(Some(RPCResponse::BeaconBlocks(Vec::new()))), _ => unreachable!("Cannot negotiate an unknown version"), }, "recent_beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(packet.to_vec()))), + "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(Vec::new()))), _ => unreachable!("Cannot negotiate an unknown version"), }, _ => unreachable!("Cannot negotiate an unknown protocol"), - }, - Ok(None) => { - // the object sent could be a empty. We return the empty object if this is the case - match self.protocol.message_name.as_str() { + } + } else { + match self.inner.decode(src).map_err(RPCError::from) { + Ok(Some(packet)) => match self.protocol.message_name.as_str() { "hello" => match self.protocol.version.as_str() { - "1" => Ok(None), // cannot have an empty HELLO message. The stream has terminated unexpectedly + "1" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes( + &packet, + )?))), _ => unreachable!("Cannot negotiate an unknown version"), }, "goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), "beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::BeaconBlocks(Vec::new()))), + "1" => Ok(Some(RPCResponse::BeaconBlocks(packet.to_vec()))), _ => unreachable!("Cannot negotiate an unknown version"), }, "recent_beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(Vec::new()))), + "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(packet.to_vec()))), _ => unreachable!("Cannot negotiate an unknown version"), }, _ => unreachable!("Cannot negotiate an unknown protocol"), - } + }, + Ok(None) => Ok(None), // waiting for more bytes + Err(e) => Err(e), } - Err(e) => Err(e), } } } diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index d912bcfa1..49813abe9 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -157,3 +157,53 @@ impl ErrorMessage { String::from_utf8(self.error_message.clone()).unwrap_or_else(|_| "".into()) } } + +impl std::fmt::Display for HelloMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Hello Message: Fork Version: {:?}, Finalized Root: {}, Finalized Epoch: {}, Head Root: {}, Head Slot: {}", self.fork_version, self.finalized_root, self.finalized_epoch, self.head_root, self.head_slot) + } +} + +impl std::fmt::Display for RPCResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RPCResponse::Hello(hello) => write!(f, "{}", hello), + RPCResponse::BeaconBlocks(data) => write!(f, ", len: {}", data.len()), + RPCResponse::RecentBeaconBlocks(data) => { + write!(f, ", len: {}", data.len()) + } + } + } +} + +impl std::fmt::Display for RPCErrorResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RPCErrorResponse::Success(res) => write!(f, "{}", res), + RPCErrorResponse::InvalidRequest(err) => write!(f, "Invalid Request: {:?}", err), + RPCErrorResponse::ServerError(err) => write!(f, "Server Error: {:?}", err), + RPCErrorResponse::Unknown(err) => write!(f, "Unknown Error: {:?}", err), + } + } +} + +impl std::fmt::Display for GoodbyeReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + GoodbyeReason::ClientShutdown => write!(f, "Client Shutdown"), + GoodbyeReason::IrrelevantNetwork => write!(f, "Irrelevant Network"), + GoodbyeReason::Fault => write!(f, "Fault"), + GoodbyeReason::Unknown => write!(f, "Unknown Reason"), + } + } +} + +impl std::fmt::Display for BeaconBlocksRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Head Block Root: {}, Start Slot: {}, Count: {}, Step: {}", + self.head_block_root, self.start_slot, self.count, self.step + ) + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 756a62e71..2076615a9 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -47,6 +47,16 @@ impl RPCEvent { } } +impl std::fmt::Display for RPCEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RPCEvent::Request(id, req) => write!(f, "RPC Request(Id: {}, {})", id, req), + RPCEvent::Response(id, res) => write!(f, "RPC Response(Id: {}, {})", id, res), + RPCEvent::Error(id, err) => write!(f, "RPC Request(Id: {}, Error: {:?})", id, err), + } + } +} + /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. pub struct RPC { diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index be1efdf5d..401fa8b9e 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -288,3 +288,14 @@ impl std::error::Error for RPCError { } } } + +impl std::fmt::Display for RPCRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RPCRequest::Hello(hello) => write!(f, "Hello Message: {}", hello), + RPCRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), + RPCRequest::BeaconBlocks(req) => write!(f, "Beacon Blocks: {}", req), + RPCRequest::RecentBeaconBlocks(req) => write!(f, "Recent Beacon Blocks: {:?}", req), + } + } +} diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 34781927c..dac011752 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -79,15 +79,32 @@ impl Service { } }; - // attempt to connect to user-input libp2p nodes - for multiaddr in config.libp2p_nodes { + // helper closure for dialing peers + let mut dial_addr = |multiaddr: Multiaddr| { match Swarm::dial_addr(&mut swarm, multiaddr.clone()) { Ok(()) => debug!(log, "Dialing libp2p peer"; "address" => format!("{}", multiaddr)), Err(err) => debug!( log, - "Could not connect to peer"; "address" => format!("{}", multiaddr), "Error" => format!("{:?}", err) + "Could not connect to peer"; "address" => format!("{}", multiaddr), "error" => format!("{:?}", err) ), }; + }; + + // attempt to connect to user-input libp2p nodes + for multiaddr in config.libp2p_nodes { + dial_addr(multiaddr); + } + + // attempt to connect to any specified boot-nodes + for bootnode_enr in config.boot_nodes { + for multiaddr in bootnode_enr.multiaddr() { + // ignore udp multiaddr if it exists + let components = multiaddr.iter().collect::>(); + if let Protocol::Udp(_) = components[1] { + continue; + } + dial_addr(multiaddr); + } } // subscribe to default gossipsub topics @@ -145,16 +162,16 @@ impl Stream for Service { fn poll(&mut self) -> Poll, Self::Error> { loop { match self.swarm.poll() { - //Behaviour events Ok(Async::Ready(Some(event))) => match event { - // TODO: Stub here for debugging BehaviourEvent::GossipMessage { + id, source, topics, message, } => { trace!(self.log, "Gossipsub message received"; "service" => "Swarm"); return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage { + id, source, topics, message, @@ -222,6 +239,7 @@ pub enum Libp2pEvent { PeerDisconnected(PeerId), /// Received pubsub message. PubsubMessage { + id: String, source: PeerId, topics: Vec, message: PubsubMessage, diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index dc08bd311..06fc06dde 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -19,3 +19,4 @@ futures = "0.1.25" error-chain = "0.12.0" tokio = "0.1.16" parking_lot = "0.9.0" +smallvec = "0.6.10" diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index c14fc970d..782d2129e 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -1,6 +1,6 @@ use crate::error; use crate::service::NetworkMessage; -use crate::sync::SimpleSync; +use crate::sync::MessageProcessor; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{ behaviour::PubsubMessage, @@ -9,18 +9,22 @@ use eth2_libp2p::{ }; use futures::future::Future; use futures::stream::Stream; -use slog::{debug, trace, warn}; +use slog::{debug, o, trace, warn}; use ssz::{Decode, DecodeError}; use std::sync::Arc; use tokio::sync::mpsc; use types::{Attestation, AttesterSlashing, BeaconBlock, ProposerSlashing, VoluntaryExit}; -/// Handles messages received from the network and client and organises syncing. +/// Handles messages received from the network and client and organises syncing. This +/// functionality of this struct is to validate an decode messages from the network before +/// passing them to the internal message processor. The message processor spawns a syncing thread +/// which manages which blocks need to be requested and processed. pub struct MessageHandler { - /// Currently loaded and initialised beacon chain. - _chain: Arc>, - /// The syncing framework. - sync: SimpleSync, + /// A channel to the network service to allow for gossip propagation. + network_send: mpsc::UnboundedSender, + /// Processes validated and decoded messages from the network. Has direct access to the + /// sync manager. + message_processor: MessageProcessor, /// The `MessageHandler` logger. log: slog::Logger, } @@ -34,8 +38,9 @@ pub enum HandlerMessage { PeerDisconnected(PeerId), /// An RPC response/request has been received. RPC(PeerId, RPCEvent), - /// A gossip message has been received. - PubsubMessage(PeerId, PubsubMessage), + /// A gossip message has been received. The fields are: message id, the peer that sent us this + /// message and the message itself. + PubsubMessage(String, PeerId, PubsubMessage), } impl MessageHandler { @@ -46,17 +51,20 @@ impl MessageHandler { executor: &tokio::runtime::TaskExecutor, log: slog::Logger, ) -> error::Result> { - trace!(log, "Service starting"); + let message_handler_log = log.new(o!("Service"=> "Message Handler")); + trace!(message_handler_log, "Service starting"); let (handler_send, handler_recv) = mpsc::unbounded_channel(); - // Initialise sync and begin processing in thread - let sync = SimpleSync::new(beacon_chain.clone(), network_send, &log); + + // Initialise a message instance, which itself spawns the syncing thread. + let message_processor = + MessageProcessor::new(executor, beacon_chain, network_send.clone(), &log); // generate the Message handler let mut handler = MessageHandler { - _chain: beacon_chain.clone(), - sync, - log: log.clone(), + network_send, + message_processor, + log: message_handler_log, }; // spawn handler task and move the message handler instance into the spawned thread @@ -65,7 +73,11 @@ impl MessageHandler { .for_each(move |msg| Ok(handler.handle_message(msg))) .map_err(move |_| { debug!(log, "Network message handler terminated."); - }), + }), /* + .then(move |_| { + debug!(log.clone(), "Message handler shutdown"); + }), + */ ); Ok(handler_send) @@ -76,19 +88,19 @@ impl MessageHandler { match message { // we have initiated a connection to a peer HandlerMessage::PeerDialed(peer_id) => { - self.sync.on_connect(peer_id); + self.message_processor.on_connect(peer_id); } // A peer has disconnected HandlerMessage::PeerDisconnected(peer_id) => { - self.sync.on_disconnect(peer_id); + self.message_processor.on_disconnect(peer_id); } // An RPC message request/response has been received HandlerMessage::RPC(peer_id, rpc_event) => { self.handle_rpc_message(peer_id, rpc_event); } // An RPC message request/response has been received - HandlerMessage::PubsubMessage(peer_id, gossip) => { - self.handle_gossip(peer_id, gossip); + HandlerMessage::PubsubMessage(id, peer_id, gossip) => { + self.handle_gossip(id, peer_id, gossip); } } } @@ -108,7 +120,7 @@ impl MessageHandler { fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) { match request { RPCRequest::Hello(hello_message) => { - self.sync + self.message_processor .on_hello_request(peer_id, request_id, hello_message) } RPCRequest::Goodbye(goodbye_reason) => { @@ -117,13 +129,13 @@ impl MessageHandler { "peer" => format!("{:?}", peer_id), "reason" => format!("{:?}", goodbye_reason), ); - self.sync.on_disconnect(peer_id); + self.message_processor.on_disconnect(peer_id); } RPCRequest::BeaconBlocks(request) => self - .sync + .message_processor .on_beacon_blocks_request(peer_id, request_id, request), RPCRequest::RecentBeaconBlocks(request) => self - .sync + .message_processor .on_recent_beacon_blocks_request(peer_id, request_id, request), } } @@ -150,12 +162,13 @@ impl MessageHandler { RPCErrorResponse::Success(response) => { match response { RPCResponse::Hello(hello_message) => { - self.sync.on_hello_response(peer_id, hello_message); + self.message_processor + .on_hello_response(peer_id, hello_message); } RPCResponse::BeaconBlocks(response) => { match self.decode_beacon_blocks(&response) { Ok(beacon_blocks) => { - self.sync.on_beacon_blocks_response( + self.message_processor.on_beacon_blocks_response( peer_id, request_id, beacon_blocks, @@ -170,7 +183,7 @@ impl MessageHandler { RPCResponse::RecentBeaconBlocks(response) => { match self.decode_beacon_blocks(&response) { Ok(beacon_blocks) => { - self.sync.on_recent_beacon_blocks_response( + self.message_processor.on_recent_beacon_blocks_response( peer_id, request_id, beacon_blocks, @@ -194,24 +207,37 @@ impl MessageHandler { } /// Handle RPC messages - fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { + fn handle_gossip(&mut self, id: String, peer_id: PeerId, gossip_message: PubsubMessage) { match gossip_message { PubsubMessage::Block(message) => match self.decode_gossip_block(message) { Ok(block) => { - let _should_forward_on = self.sync.on_block_gossip(peer_id, block); + let should_forward_on = self + .message_processor + .on_block_gossip(peer_id.clone(), block); + // TODO: Apply more sophisticated validation and decoding logic + if should_forward_on { + self.propagate_message(id, peer_id.clone()); + } } Err(e) => { debug!(self.log, "Invalid gossiped beacon block"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); } }, PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) { - Ok(attestation) => self.sync.on_attestation_gossip(peer_id, attestation), + Ok(attestation) => { + // TODO: Apply more sophisticated validation and decoding logic + self.propagate_message(id, peer_id.clone()); + self.message_processor + .on_attestation_gossip(peer_id, attestation); + } Err(e) => { debug!(self.log, "Invalid gossiped attestation"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); } }, PubsubMessage::VoluntaryExit(message) => match self.decode_gossip_exit(message) { Ok(_exit) => { + // TODO: Apply more sophisticated validation and decoding logic + self.propagate_message(id, peer_id.clone()); // TODO: Handle exits debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id) ); } @@ -222,6 +248,8 @@ impl MessageHandler { PubsubMessage::ProposerSlashing(message) => { match self.decode_gossip_proposer_slashing(message) { Ok(_slashing) => { + // TODO: Apply more sophisticated validation and decoding logic + self.propagate_message(id, peer_id.clone()); // TODO: Handle proposer slashings debug!(self.log, "Received a proposer slashing"; "peer_id" => format!("{}", peer_id) ); } @@ -233,6 +261,8 @@ impl MessageHandler { PubsubMessage::AttesterSlashing(message) => { match self.decode_gossip_attestation_slashing(message) { Ok(_slashing) => { + // TODO: Apply more sophisticated validation and decoding logic + self.propagate_message(id, peer_id.clone()); // TODO: Handle attester slashings debug!(self.log, "Received an attester slashing"; "peer_id" => format!("{}", peer_id) ); } @@ -248,6 +278,21 @@ impl MessageHandler { } } + /// Informs the network service that the message should be forwarded to other peers. + fn propagate_message(&mut self, message_id: String, propagation_source: PeerId) { + self.network_send + .try_send(NetworkMessage::Propagate { + propagation_source, + message_id, + }) + .unwrap_or_else(|_| { + warn!( + self.log, + "Could not send propagation request to the network service" + ) + }); + } + /* Decoding of gossipsub objects from the network. * * The decoding is done in the message handler as it has access to to a `BeaconChain` and can diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index a8b3c74b6..1357b5495 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -34,13 +34,8 @@ impl Service { // build the network channel let (network_send, network_recv) = mpsc::unbounded_channel::(); // launch message handler thread - let message_handler_log = log.new(o!("Service" => "MessageHandler")); - let message_handler_send = MessageHandler::spawn( - beacon_chain, - network_send.clone(), - executor, - message_handler_log, - )?; + let message_handler_send = + MessageHandler::spawn(beacon_chain, network_send.clone(), executor, log.clone())?; let network_log = log.new(o!("Service" => "Network")); // launch libp2p service @@ -159,12 +154,23 @@ fn network_service( // poll the network channel match network_recv.poll() { Ok(Async::Ready(Some(message))) => match message { - NetworkMessage::Send(peer_id, outgoing_message) => match outgoing_message { - OutgoingMessage::RPC(rpc_event) => { - trace!(log, "Sending RPC Event: {:?}", rpc_event); - libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event); - } - }, + NetworkMessage::RPC(peer_id, rpc_event) => { + trace!(log, "{}", rpc_event); + libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event); + } + NetworkMessage::Propagate { + propagation_source, + message_id, + } => { + trace!(log, "Propagating gossipsub message"; + "propagation_peer" => format!("{:?}", propagation_source), + "message_id" => format!("{}", message_id), + ); + libp2p_service + .lock() + .swarm + .propagate_message(&propagation_source, message_id); + } NetworkMessage::Publish { topics, message } => { debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); libp2p_service.lock().swarm.publish(&topics, message); @@ -185,7 +191,7 @@ fn network_service( match libp2p_service.lock().poll() { Ok(Async::Ready(Some(event))) => match event { Libp2pEvent::RPC(peer_id, rpc_event) => { - trace!(log, "RPC Event: RPC message received: {:?}", rpc_event); + trace!(log, "{}", rpc_event); message_handler_send .try_send(HandlerMessage::RPC(peer_id, rpc_event)) .map_err(|_| "Failed to send RPC to handler")?; @@ -203,13 +209,14 @@ fn network_service( .map_err(|_| "Failed to send PeerDisconnected to handler")?; } Libp2pEvent::PubsubMessage { - source, message, .. + id, + source, + message, + .. } => { - //TODO: Decide if we need to propagate the topic upwards. (Potentially for - //attestations) message_handler_send - .try_send(HandlerMessage::PubsubMessage(source, message)) - .map_err(|_| " failed to send pubsub message to handler")?; + .try_send(HandlerMessage::PubsubMessage(id, source, message)) + .map_err(|_| "Failed to send pubsub message to handler")?; } }, Ok(Async::Ready(None)) => unreachable!("Stream never ends"), @@ -225,19 +232,16 @@ fn network_service( /// Types of messages that the network service can receive. #[derive(Debug)] pub enum NetworkMessage { - /// Send a message to libp2p service. - //TODO: Define typing for messages across the wire - Send(PeerId, OutgoingMessage), - /// Publish a message to pubsub mechanism. + /// Send an RPC message to the libp2p service. + RPC(PeerId, RPCEvent), + /// Publish a message to gossipsub. Publish { topics: Vec, message: PubsubMessage, }, -} - -/// Type of outgoing messages that can be sent through the network service. -#[derive(Debug)] -pub enum OutgoingMessage { - /// Send an RPC request/response. - RPC(RPCEvent), + /// Propagate a received gossipsub message + Propagate { + propagation_source: PeerId, + message_id: String, + }, } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 9cce6300d..171d0fdf0 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1,113 +1,309 @@ -use super::simple_sync::{PeerSyncInfo, FUTURE_SLOT_TOLERANCE}; +//! The `SyncManager` facilities the block syncing logic of lighthouse. The current networking +//! specification provides two methods from which to obtain blocks from peers. The `BeaconBlocks` +//! request and the `RecentBeaconBlocks` request. The former is used to obtain a large number of +//! blocks and the latter allows for searching for blocks given a block-hash. +//! +//! These two RPC methods are designed for two type of syncing. +//! - Long range (batch) sync, when a client is out of date and needs to the latest head. +//! - Parent lookup - when a peer provides us a block whose parent is unknown to us. +//! +//! Both of these syncing strategies are built into the `SyncManager`. +//! +//! +//! Currently the long-range (batch) syncing method functions by opportunistically downloading +//! batches blocks from all peers who know about a chain that we do not. When a new peer connects +//! which has a later head that is greater than `SLOT_IMPORT_TOLERANCE` from our current head slot, +//! the manager's state becomes `Syncing` and begins a batch syncing process with this peer. If +//! further peers connect, this process is run in parallel with those peers, until our head is +//! within `SLOT_IMPORT_TOLERANCE` of all connected peers. +//! +//! Batch Syncing +//! +//! This syncing process start by requesting `MAX_BLOCKS_PER_REQUEST` blocks from a peer with an +//! unknown chain (with a greater slot height) starting from our current head slot. If the earliest +//! block returned is known to us, then the group of blocks returned form part of a known chain, +//! and we process this batch of blocks, before requesting more batches forward and processing +//! those in turn until we reach the peer's chain's head. If the first batch doesn't contain a +//! block we know of, we must iteratively request blocks backwards (until our latest finalized head +//! slot) until we find a common ancestor before we can start processing the blocks. If no common +//! ancestor is found, the peer has a chain which is not part of our finalized head slot and we +//! drop the peer and the downloaded blocks. +//! Once we are fully synced with all known peers, the state of the manager becomes `Regular` which +//! then allows for parent lookups of propagated blocks. +//! +//! A schematic version of this logic with two chain variations looks like the following. +//! +//! |----------------------|---------------------------------| +//! ^finalized head ^current local head ^remotes head +//! +//! +//! An example of the remotes chain diverging before our current head. +//! |---------------------------| +//! ^---------------------------------------------| +//! ^chain diverges |initial batch| ^remotes head +//! +//! In this example, we cannot process the initial batch as it is not on a known chain. We must +//! then backwards sync until we reach a common chain to begin forwarding batch syncing. +//! +//! +//! Parent Lookup +//! +//! When a block with an unknown parent is received and we are in `Regular` sync mode, the block is +//! queued for lookup. A round-robin approach is used to request the parent from the known list of +//! fully sync'd peers. If `PARENT_FAIL_TOLERANCE` attempts at requesting the block fails, we +//! drop the propagated block and downvote the peer that sent it to us. + +use super::simple_sync::{hello_message, NetworkContext, PeerSyncInfo, FUTURE_SLOT_TOLERANCE}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::RequestId; +use eth2_libp2p::rpc::{RPCRequest, RequestId}; use eth2_libp2p::PeerId; +use futures::prelude::*; use slog::{debug, info, trace, warn, Logger}; +use smallvec::SmallVec; use std::collections::{HashMap, HashSet}; use std::ops::{Add, Sub}; -use std::sync::Arc; +use std::sync::Weak; +use tokio::sync::{mpsc, oneshot}; use types::{BeaconBlock, EthSpec, Hash256, Slot}; -const MAX_BLOCKS_PER_REQUEST: u64 = 10; +/// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch +/// is requested. Currently the value is small for testing. This will be incremented for +/// production. +const MAX_BLOCKS_PER_REQUEST: u64 = 50; -/// The number of slots that we can import blocks ahead of us, before going into full Sync mode. +/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync +/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a +/// fully sync'd peer. const SLOT_IMPORT_TOLERANCE: usize = 10; +/// How many attempts we try to find a parent of a block before we give up trying . const PARENT_FAIL_TOLERANCE: usize = 3; +/// The maximum depth we will search for a parent block. In principle we should have sync'd any +/// canonical chain to its head once the peer connects. A chain should not appear where it's depth +/// is further back than the most recent head slot. const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; +/// The number of empty batches we tolerate before dropping the peer. This prevents endless +/// requests to peers who never return blocks. +const EMPTY_BATCH_TOLERANCE: usize = 100; + +#[derive(Debug)] +/// A message than can be sent to the sync manager thread. +pub enum SyncMessage { + /// A useful peer has been discovered. + AddPeer(PeerId, PeerSyncInfo), + /// A `BeaconBlocks` response has been received. + BeaconBlocksResponse { + peer_id: PeerId, + request_id: RequestId, + beacon_blocks: Vec>, + }, + /// A `RecentBeaconBlocks` response has been received. + RecentBeaconBlocksResponse { + peer_id: PeerId, + request_id: RequestId, + beacon_blocks: Vec>, + }, + /// A block with an unknown parent has been received. + UnknownBlock(PeerId, BeaconBlock), + /// A peer has disconnected. + Disconnect(PeerId), + /// An RPC Error has occurred on a request. + _RPCError(RequestId), +} #[derive(PartialEq)] +/// The current state of a block or batches lookup. enum BlockRequestsState { + /// The object is queued to be downloaded from a peer but has not yet been requested. Queued, + /// The batch or parent has been requested with the `RequestId` and we are awaiting a response. Pending(RequestId), - Complete, + /// The downloaded blocks are ready to be processed by the beacon chain. For a batch process + /// this means we have found a common chain. + ReadyToProcess, + /// A failure has occurred and we will drop and downvote the peer that caused the request. Failed, } +/// The state of batch requests. +enum SyncDirection { + /// The batch has just been initialised and we need to check to see if a backward sync is + /// required on first batch response. + Initial, + /// We are syncing forwards, the next batch should contain higher slot numbers than is + /// predecessor. + Forwards, + /// We are syncing backwards and looking for a common ancestor chain before we can start + /// processing the downloaded blocks. + Backwards, +} + +/// `BlockRequests` keep track of the long-range (batch) sync process per peer. struct BlockRequests { + /// The peer's head slot and the target of this batch download. target_head_slot: Slot, + /// The peer's head root, used to specify which chain of blocks we are downloading from the + /// blocks. target_head_root: Hash256, + /// The blocks that we have currently downloaded from the peer that are yet to be processed. downloaded_blocks: Vec>, + /// The number of blocks successfully processed in this request. + blocks_processed: usize, + /// The number of empty batches we have consecutively received. If a peer returns more than + /// EMPTY_BATCHES_TOLERANCE, they are dropped. + consecutive_empty_batches: usize, + /// The current state of this batch request. state: BlockRequestsState, - /// Specifies whether the current state is syncing forwards or backwards. - forward_sync: bool, + /// Specifies the current direction of this batch request. + sync_direction: SyncDirection, /// The current `start_slot` of the batched block request. current_start_slot: Slot, } +/// Maintains a sequential list of parents to lookup and the lookup's current state. struct ParentRequests { + /// The blocks that have currently been downloaded. downloaded_blocks: Vec>, + /// The number of failed attempts to retrieve a parent block. If too many attempts occur, this + /// lookup is failed and rejected. failed_attempts: usize, - last_submitted_peer: PeerId, // to downvote the submitting peer. + /// The peer who last submitted a block. If the chain ends or fails, this is the peer that is + /// downvoted. + last_submitted_peer: PeerId, + /// The current state of the parent lookup. state: BlockRequestsState, } impl BlockRequests { - // gets the start slot for next batch - // last block slot downloaded plus 1 + /// Gets the next start slot for a batch and transitions the state to a Queued state. fn update_start_slot(&mut self) { - if self.forward_sync { - self.current_start_slot += Slot::from(MAX_BLOCKS_PER_REQUEST); - } else { - self.current_start_slot -= Slot::from(MAX_BLOCKS_PER_REQUEST); + match self.sync_direction { + SyncDirection::Initial | SyncDirection::Forwards => { + self.current_start_slot += Slot::from(MAX_BLOCKS_PER_REQUEST); + } + SyncDirection::Backwards => { + self.current_start_slot -= Slot::from(MAX_BLOCKS_PER_REQUEST); + } } self.state = BlockRequestsState::Queued; } } #[derive(PartialEq, Debug, Clone)] +/// The current state of the `ImportManager`. enum ManagerState { + /// The manager is performing a long-range (batch) sync. In this mode, parent lookups are + /// disabled. Syncing, + /// The manager is up to date with all known peers and is connected to at least one + /// fully-syncing peer. In this state, parent lookups are enabled. Regular, + /// No useful peers are connected. Long-range sync's cannot proceed and we have no useful + /// peers to download parents for. More peers need to be connected before we can proceed. Stalled, } -pub(crate) enum ImportManagerOutcome { - Idle, - RequestBlocks { - peer_id: PeerId, - request_id: RequestId, - request: BeaconBlocksRequest, - }, - /// Updates information with peer via requesting another HELLO handshake. - Hello(PeerId), - RecentRequest(PeerId, RecentBeaconBlocksRequest), - DownvotePeer(PeerId), -} - -pub struct ImportManager { - /// A reference to the underlying beacon chain. - chain: Arc>, +/// The primary object for handling and driving all the current syncing logic. It maintains the +/// current state of the syncing process, the number of useful peers, downloaded blocks and +/// controls the logic behind both the long-range (batch) sync and the on-going potential parent +/// look-up of blocks. +pub struct SyncManager { + /// A weak reference to the underlying beacon chain. + chain: Weak>, + /// The current state of the import manager. state: ManagerState, + /// A receiving channel sent by the message processor thread. + input_channel: mpsc::UnboundedReceiver>, + /// A network context to contact the network service. + network: NetworkContext, + /// A collection of `BlockRequest` per peer that is currently being downloaded. Used in the + /// long-range (batch) sync process. import_queue: HashMap>, - parent_queue: Vec>, + /// A collection of parent block lookups. + parent_queue: SmallVec<[ParentRequests; 3]>, + /// The collection of known, connected, fully-sync'd peers. full_peers: HashSet, + /// The current request Id. This is used to keep track of responses to various outbound + /// requests. This is an internal accounting mechanism, request id's are never sent to any + /// peers. current_req_id: usize, + /// The logger for the import manager. log: Logger, } -impl ImportManager { - pub fn new(beacon_chain: Arc>, log: &slog::Logger) -> Self { - ImportManager { - chain: beacon_chain.clone(), - state: ManagerState::Regular, - import_queue: HashMap::new(), - parent_queue: Vec::new(), - full_peers: HashSet::new(), - current_req_id: 0, - log: log.clone(), - } - } +/// Spawns a new `SyncManager` thread which has a weak reference to underlying beacon +/// chain. This allows the chain to be +/// dropped during the syncing process which will gracefully end the `SyncManager`. +pub fn spawn( + executor: &tokio::runtime::TaskExecutor, + beacon_chain: Weak>, + network: NetworkContext, + log: slog::Logger, +) -> ( + mpsc::UnboundedSender>, + oneshot::Sender<()>, +) { + // generate the exit channel + let (sync_exit, exit_rx) = tokio::sync::oneshot::channel(); + // generate the message channel + let (sync_send, sync_recv) = mpsc::unbounded_channel::>(); + // create an instance of the SyncManager + let sync_manager = SyncManager { + chain: beacon_chain, + state: ManagerState::Stalled, + input_channel: sync_recv, + network, + import_queue: HashMap::new(), + parent_queue: SmallVec::new(), + full_peers: HashSet::new(), + current_req_id: 0, + log: log.clone(), + }; + + // spawn the sync manager thread + debug!(log, "Sync Manager started"); + executor.spawn( + sync_manager + .select(exit_rx.then(|_| Ok(()))) + .then(move |_| { + info!(log.clone(), "Sync Manager shutdown"); + Ok(()) + }), + ); + (sync_send, sync_exit) +} + +impl SyncManager { + /* Input Handling Functions */ + + /// A peer has connected which has blocks that are unknown to us. + /// + /// This function handles the logic associated with the connection of a new peer. If the peer + /// is sufficiently ahead of our current head, a long-range (batch) sync is started and + /// batches of blocks are queued to download from the peer. Batched blocks begin at our + /// current head. If the resulting downloaded blocks are part of our current chain, we + /// continue with a forward sync. If not, we download blocks (in batches) backwards until we + /// reach a common ancestor. Batches are then processed and downloaded sequentially forwards. + /// + /// If the peer is within the `SLOT_IMPORT_TOLERANCE`, then it's head is sufficiently close to + /// ours that we consider it fully sync'd with respect to our current chain. pub fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) { - // TODO: Improve comments. - // initially try to download blocks from our current head - // then backwards search all the way back to our finalized epoch until we match on a chain - // has to be done sequentially to find next slot to start the batch from + // ensure the beacon chain still exists + let chain = match self.chain.upgrade() { + Some(chain) => chain, + None => { + warn!(self.log, + "Beacon chain dropped. Peer not considered for sync"; + "peer_id" => format!("{:?}", peer_id)); + return; + } + }; - let local = PeerSyncInfo::from(&self.chain); + let local = PeerSyncInfo::from(&chain); - // If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch sync + // If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch sync, + // consider it a fully-sync'd peer. if remote.head_slot.sub(local.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { trace!(self.log, "Ignoring full sync with peer"; "peer" => format!("{:?}", peer_id), @@ -116,34 +312,64 @@ impl ImportManager { ); // remove the peer from the queue if it exists self.import_queue.remove(&peer_id); + self.add_full_peer(peer_id); + // return; } + // Check if the peer is significantly behind us. If within `SLOT_IMPORT_TOLERANCE` + // treat them as a fully synced peer. If not, ignore them in the sync process + if local.head_slot.sub(remote.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { + self.add_full_peer(peer_id.clone()); + } else { + debug!( + self.log, + "Out of sync peer connected"; + "peer" => format!("{:?}", peer_id), + ); + return; + } + + // Check if we are already downloading blocks from this peer, if so update, if not set up + // a new request structure if let Some(block_requests) = self.import_queue.get_mut(&peer_id) { // update the target head slot if remote.head_slot > block_requests.target_head_slot { block_requests.target_head_slot = remote.head_slot; } } else { + // not already downloading blocks from this peer let block_requests = BlockRequests { target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked in the SyncManager before add_peer is called target_head_root: remote.head_root, + consecutive_empty_batches: 0, downloaded_blocks: Vec::new(), + blocks_processed: 0, state: BlockRequestsState::Queued, - forward_sync: true, - current_start_slot: self.chain.best_slot(), + sync_direction: SyncDirection::Initial, + current_start_slot: chain.best_slot(), }; self.import_queue.insert(peer_id, block_requests); } } + /// A `BeaconBlocks` request has received a response. This function process the response. pub fn beacon_blocks_response( &mut self, peer_id: PeerId, request_id: RequestId, mut blocks: Vec>, ) { - // find the request + // ensure the underlying chain still exists + let chain = match self.chain.upgrade() { + Some(chain) => chain, + None => { + trace!(self.log, "Chain dropped. Sync terminating"); + return; + } + }; + + // find the request associated with this response let block_requests = match self .import_queue .get_mut(&peer_id) @@ -167,10 +393,25 @@ impl ImportManager { if blocks.is_empty() { debug!(self.log, "BeaconBlocks response was empty"; "request_id" => request_id); - block_requests.update_start_slot(); + block_requests.consecutive_empty_batches += 1; + if block_requests.consecutive_empty_batches >= EMPTY_BATCH_TOLERANCE { + warn!(self.log, "Peer returned too many empty block batches"; + "peer" => format!("{:?}", peer_id)); + block_requests.state = BlockRequestsState::Failed; + } else if block_requests.current_start_slot + MAX_BLOCKS_PER_REQUEST + >= block_requests.target_head_slot + { + warn!(self.log, "Peer did not return blocks it claimed to possess"; + "peer" => format!("{:?}", peer_id)); + block_requests.state = BlockRequestsState::Failed; + } else { + block_requests.update_start_slot(); + } return; } + block_requests.consecutive_empty_batches = 0; + // verify the range of received blocks // Note that the order of blocks is verified in block processing let last_sent_slot = blocks[blocks.len() - 1].slot; @@ -180,90 +421,96 @@ impl ImportManager { .add(MAX_BLOCKS_PER_REQUEST) < last_sent_slot { - //TODO: Downvote peer - add a reason to failed - dbg!(&blocks); warn!(self.log, "BeaconBlocks response returned out of range blocks"; "request_id" => request_id, "response_initial_slot" => blocks[0].slot, "requested_initial_slot" => block_requests.current_start_slot); + downvote_peer(&mut self.network, &self.log, peer_id); // consider this sync failed block_requests.state = BlockRequestsState::Failed; return; } // Determine if more blocks need to be downloaded. There are a few cases: - // - We have downloaded a batch from our head_slot, which has not reached the remotes head - // (target head). Therefore we need to download another sequential batch. - // - The latest batch includes blocks that greater than or equal to the target_head slot, - // which means we have caught up to their head. We then check to see if the first - // block downloaded matches our head. If so, we are on the same chain and can process - // the blocks. If not we need to sync back further until we are on the same chain. So - // request more blocks. - // - We are syncing backwards (from our head slot) and need to check if we are on the same - // chain. If so, process the blocks, if not, request more blocks all the way up to - // our last finalized slot. + // - We are in initial sync mode - We have requested blocks and need to determine if this + // is part of a known chain to determine the whether to start syncing backwards or continue + // syncing forwards. + // - We are syncing backwards and need to verify if we have found a common ancestor in + // order to start processing the downloaded blocks. + // - We are syncing forwards. We mark this as complete and check if any further blocks are + // required to download when processing the batch. - if block_requests.forward_sync { - // append blocks if syncing forward - block_requests.downloaded_blocks.append(&mut blocks); - } else { - // prepend blocks if syncing backwards - block_requests.downloaded_blocks.splice(..0, blocks); - } + match block_requests.sync_direction { + SyncDirection::Initial => { + block_requests.downloaded_blocks.append(&mut blocks); - // does the batch contain the target_head_slot - let last_element_index = block_requests.downloaded_blocks.len() - 1; - if block_requests.downloaded_blocks[last_element_index].slot - >= block_requests.target_head_slot - || !block_requests.forward_sync - { - // if the batch is on our chain, this is complete and we can then process. - // Otherwise start backwards syncing until we reach a common chain. - let earliest_slot = block_requests.downloaded_blocks[0].slot; - //TODO: Decide which is faster. Reading block from db and comparing or calculating - //the hash tree root and comparing. - if Some(block_requests.downloaded_blocks[0].canonical_root()) - == root_at_slot(&self.chain, earliest_slot) - { - block_requests.state = BlockRequestsState::Complete; - return; + // this batch is the first batch downloaded. Check if we can process or if we need + // to backwards search. + + //TODO: Decide which is faster. Reading block from db and comparing or calculating + //the hash tree root and comparing. + let earliest_slot = block_requests.downloaded_blocks[0].slot; + if Some(block_requests.downloaded_blocks[0].canonical_root()) + == chain.root_at_slot(earliest_slot) + { + // we have a common head, start processing and begin a forwards sync + block_requests.sync_direction = SyncDirection::Forwards; + block_requests.state = BlockRequestsState::ReadyToProcess; + return; + } + // no common head, begin a backwards search + block_requests.sync_direction = SyncDirection::Backwards; + block_requests.current_start_slot = + std::cmp::min(chain.best_slot(), block_requests.downloaded_blocks[0].slot); + block_requests.update_start_slot(); } - - // not on the same chain, request blocks backwards - let state = &self.chain.head().beacon_state; - let local_finalized_slot = state - .finalized_checkpoint - .epoch - .start_slot(T::EthSpec::slots_per_epoch()); - - // check that the request hasn't failed by having no common chain - if local_finalized_slot >= block_requests.current_start_slot { - warn!(self.log, "Peer returned an unknown chain."; "request_id" => request_id); - block_requests.state = BlockRequestsState::Failed; - return; + SyncDirection::Forwards => { + // continue processing all blocks forwards, verify the end in the processing + block_requests.downloaded_blocks.append(&mut blocks); + block_requests.state = BlockRequestsState::ReadyToProcess; } + SyncDirection::Backwards => { + block_requests.downloaded_blocks.splice(..0, blocks); - // if this is a forward sync, then we have reached the head without a common chain - // and we need to start syncing backwards. - if block_requests.forward_sync { - // Start a backwards sync by requesting earlier blocks - block_requests.forward_sync = false; - block_requests.current_start_slot = std::cmp::min( - self.chain.best_slot(), - block_requests.downloaded_blocks[0].slot, - ); + // verify the request hasn't failed by having no common ancestor chain + // get our local finalized_slot + let local_finalized_slot = { + let state = &chain.head().beacon_state; + state + .finalized_checkpoint + .epoch + .start_slot(T::EthSpec::slots_per_epoch()) + }; + + if local_finalized_slot >= block_requests.current_start_slot { + warn!(self.log, "Peer returned an unknown chain."; "request_id" => request_id); + block_requests.state = BlockRequestsState::Failed; + return; + } + + // check if we have reached a common chain ancestor + let earliest_slot = block_requests.downloaded_blocks[0].slot; + if Some(block_requests.downloaded_blocks[0].canonical_root()) + == chain.root_at_slot(earliest_slot) + { + // we have a common head, start processing and begin a forwards sync + block_requests.sync_direction = SyncDirection::Forwards; + block_requests.state = BlockRequestsState::ReadyToProcess; + return; + } + + // no common chain, haven't passed last_finalized_head, so continue backwards + // search + block_requests.update_start_slot(); } } - - // update the start slot and re-queue the batch - block_requests.update_start_slot(); } pub fn recent_blocks_response( &mut self, peer_id: PeerId, request_id: RequestId, - blocks: Vec>, + mut blocks: Vec>, ) { // find the request let parent_request = match self @@ -298,32 +545,18 @@ impl ImportManager { return; } + // add the block to response + parent_request + .downloaded_blocks + .push(blocks.pop().expect("must exist")); + // queue for processing - parent_request.state = BlockRequestsState::Complete; + parent_request.state = BlockRequestsState::ReadyToProcess; } - pub fn _inject_error(_peer_id: PeerId, _id: RequestId) { - //TODO: Remove block state from pending - } - - pub fn peer_disconnect(&mut self, peer_id: &PeerId) { - self.import_queue.remove(peer_id); - self.full_peers.remove(peer_id); - self.update_state(); - } - - pub fn add_full_peer(&mut self, peer_id: PeerId) { - debug!( - self.log, "Fully synced peer added"; - "peer" => format!("{:?}", peer_id), - ); - self.full_peers.insert(peer_id); - self.update_state(); - } - - pub fn add_unknown_block(&mut self, block: BeaconBlock, peer_id: PeerId) { + fn add_unknown_block(&mut self, peer_id: PeerId, block: BeaconBlock) { // if we are not in regular sync mode, ignore this block - if let ManagerState::Regular = self.state { + if self.state != ManagerState::Regular { return; } @@ -350,38 +583,28 @@ impl ImportManager { self.parent_queue.push(req); } - pub(crate) fn poll(&mut self) -> ImportManagerOutcome { - loop { - // update the state of the manager - self.update_state(); - - // process potential block requests - if let Some(outcome) = self.process_potential_block_requests() { - return outcome; - } - - // process any complete long-range batches - if let Some(outcome) = self.process_complete_batches() { - return outcome; - } - - // process any parent block lookup-requests - if let Some(outcome) = self.process_parent_requests() { - return outcome; - } - - // process any complete parent lookups - let (re_run, outcome) = self.process_complete_parent_requests(); - if let Some(outcome) = outcome { - return outcome; - } else if !re_run { - break; - } - } - - return ImportManagerOutcome::Idle; + fn inject_error(&mut self, _id: RequestId) { + //TODO: Remove block state from pending } + fn peer_disconnect(&mut self, peer_id: &PeerId) { + self.import_queue.remove(peer_id); + self.full_peers.remove(peer_id); + self.update_state(); + } + + fn add_full_peer(&mut self, peer_id: PeerId) { + debug!( + self.log, "Fully synced peer added"; + "peer" => format!("{:?}", peer_id), + ); + self.full_peers.insert(peer_id); + } + + /* Processing State Functions */ + // These functions are called in the main poll function to transition the state of the sync + // manager + fn update_state(&mut self) { let previous_state = self.state.clone(); self.state = { @@ -401,20 +624,22 @@ impl ImportManager { } } - fn process_potential_block_requests(&mut self) -> Option { + fn process_potential_block_requests(&mut self) { // check if an outbound request is required // Managing a fixed number of outbound requests is maintained at the RPC protocol libp2p - // layer and not needed here. - // If any in queued state we submit a request. + // layer and not needed here. Therefore we create many outbound requests and let the RPC + // handle the number of simultaneous requests. Request all queued objects. // remove any failed batches let debug_log = &self.log; + let full_peer_ref = &mut self.full_peers; self.import_queue.retain(|peer_id, block_request| { if let BlockRequestsState::Failed = block_request.state { debug!(debug_log, "Block import from peer failed"; "peer_id" => format!("{:?}", peer_id), - "downloaded_blocks" => block_request.downloaded_blocks.len() + "downloaded_blocks" => block_request.blocks_processed ); + full_peer_ref.remove(peer_id); false } else { true @@ -422,71 +647,101 @@ impl ImportManager { }); // process queued block requests - for (peer_id, block_requests) in self - .import_queue - .iter_mut() - .find(|(_peer_id, req)| req.state == BlockRequestsState::Queued) - { - let request_id = self.current_req_id; - block_requests.state = BlockRequestsState::Pending(request_id); - self.current_req_id += 1; + for (peer_id, block_requests) in self.import_queue.iter_mut() { + { + if block_requests.state == BlockRequestsState::Queued { + let request_id = self.current_req_id; + block_requests.state = BlockRequestsState::Pending(request_id); + self.current_req_id += 1; - let request = BeaconBlocksRequest { - head_block_root: block_requests.target_head_root, - start_slot: block_requests.current_start_slot.as_u64(), - count: MAX_BLOCKS_PER_REQUEST, - step: 0, - }; - return Some(ImportManagerOutcome::RequestBlocks { - peer_id: peer_id.clone(), - request, - request_id, - }); - } - - None - } - - fn process_complete_batches(&mut self) -> Option { - let completed_batches = self - .import_queue - .iter() - .filter(|(_peer, block_requests)| block_requests.state == BlockRequestsState::Complete) - .map(|(peer, _)| peer) - .cloned() - .collect::>(); - for peer_id in completed_batches { - let block_requests = self.import_queue.remove(&peer_id).expect("key exists"); - match self.process_blocks(block_requests.downloaded_blocks.clone()) { - Ok(()) => { - //TODO: Verify it's impossible to have empty downloaded_blocks - let last_element = block_requests.downloaded_blocks.len() - 1; - debug!(self.log, "Blocks processed successfully"; - "peer" => format!("{:?}", peer_id), - "start_slot" => block_requests.downloaded_blocks[0].slot, - "end_slot" => block_requests.downloaded_blocks[last_element].slot, - "no_blocks" => last_element + 1, + let request = BeaconBlocksRequest { + head_block_root: block_requests.target_head_root, + start_slot: block_requests.current_start_slot.as_u64(), + count: MAX_BLOCKS_PER_REQUEST, + step: 0, + }; + request_blocks( + &mut self.network, + &self.log, + peer_id.clone(), + request_id, + request, ); - // Re-HELLO to ensure we are up to the latest head - return Some(ImportManagerOutcome::Hello(peer_id)); - } - Err(e) => { - let last_element = block_requests.downloaded_blocks.len() - 1; - warn!(self.log, "Block processing failed"; - "peer" => format!("{:?}", peer_id), - "start_slot" => block_requests.downloaded_blocks[0].slot, - "end_slot" => block_requests.downloaded_blocks[last_element].slot, - "no_blocks" => last_element + 1, - "error" => format!("{:?}", e), - ); - return Some(ImportManagerOutcome::DownvotePeer(peer_id)); } } } - None } - fn process_parent_requests(&mut self) -> Option { + fn process_complete_batches(&mut self) -> bool { + // This function can queue extra blocks and the main poll loop will need to be re-executed + // to process these. This flag indicates that the main poll loop has to continue. + let mut re_run_poll = false; + + // create reference variables to be moved into subsequent closure + let chain_ref = self.chain.clone(); + let log_ref = &self.log; + let network_ref = &mut self.network; + + self.import_queue.retain(|peer_id, block_requests| { + if block_requests.state == BlockRequestsState::ReadyToProcess { + let downloaded_blocks = + std::mem::replace(&mut block_requests.downloaded_blocks, Vec::new()); + let last_element = downloaded_blocks.len() - 1; + let start_slot = downloaded_blocks[0].slot; + let end_slot = downloaded_blocks[last_element].slot; + + match process_blocks(chain_ref.clone(), downloaded_blocks, log_ref) { + Ok(()) => { + debug!(log_ref, "Blocks processed successfully"; + "peer" => format!("{:?}", peer_id), + "start_slot" => start_slot, + "end_slot" => end_slot, + "no_blocks" => last_element + 1, + ); + block_requests.blocks_processed += last_element + 1; + + // check if the batch is complete, by verifying if we have reached the + // target head + if end_slot >= block_requests.target_head_slot { + // Completed, re-hello the peer to ensure we are up to the latest head + hello_peer(network_ref, log_ref, chain_ref.clone(), peer_id.clone()); + // remove the request + false + } else { + // have not reached the end, queue another batch + block_requests.update_start_slot(); + re_run_poll = true; + // keep the batch + true + } + } + Err(e) => { + warn!(log_ref, "Block processing failed"; + "peer" => format!("{:?}", peer_id), + "start_slot" => start_slot, + "end_slot" => end_slot, + "no_blocks" => last_element + 1, + "error" => format!("{:?}", e), + ); + downvote_peer(network_ref, log_ref, peer_id.clone()); + false + } + } + } else { + // not ready to process + true + } + }); + + re_run_poll + } + + fn process_parent_requests(&mut self) { + // check to make sure there are peers to search for the parent from + if self.full_peers.is_empty() { + return; + } + // remove any failed requests let debug_log = &self.log; self.parent_queue.retain(|parent_request| { @@ -501,11 +756,6 @@ impl ImportManager { } }); - // check to make sure there are peers to search for the parent from - if self.full_peers.is_empty() { - return None; - } - // check if parents need to be searched for for parent_request in self.parent_queue.iter_mut() { if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE { @@ -518,34 +768,38 @@ impl ImportManager { continue; } - parent_request.state = BlockRequestsState::Pending(self.current_req_id); + let request_id = self.current_req_id; + parent_request.state = BlockRequestsState::Pending(request_id); self.current_req_id += 1; let last_element_index = parent_request.downloaded_blocks.len() - 1; let parent_hash = parent_request.downloaded_blocks[last_element_index].parent_root; - let req = RecentBeaconBlocksRequest { + let request = RecentBeaconBlocksRequest { block_roots: vec![parent_hash], }; // select a random fully synced peer to attempt to download the parent block let peer_id = self.full_peers.iter().next().expect("List is not empty"); - return Some(ImportManagerOutcome::RecentRequest(peer_id.clone(), req)); + recent_blocks_request( + &mut self.network, + &self.log, + peer_id.clone(), + request_id, + request, + ); } } - - None } - fn process_complete_parent_requests(&mut self) -> (bool, Option) { - // flag to determine if there is more process to drive or if the manager can be switched to - // an idle state - let mut re_run = false; + fn process_complete_parent_requests(&mut self) -> bool { + // returned value indicating whether the manager can be switched to idle or not + let mut re_run_poll = false; // Find any parent_requests ready to be processed for completed_request in self .parent_queue .iter_mut() - .filter(|req| req.state == BlockRequestsState::Complete) + .filter(|req| req.state == BlockRequestsState::ReadyToProcess) { // verify the last added block is the parent of the last requested block let last_index = completed_request.downloaded_blocks.len() - 1; @@ -563,7 +817,8 @@ impl ImportManager { "received_block" => format!("{}", block_hash), "expected_parent" => format!("{}", expected_hash), ); - return (true, Some(ImportManagerOutcome::DownvotePeer(peer))); + re_run_poll = true; + downvote_peer(&mut self.network, &self.log, peer); } // try and process the list of blocks up to the requested block @@ -572,72 +827,158 @@ impl ImportManager { .downloaded_blocks .pop() .expect("Block must exist exist"); - match self.chain.process_block(block.clone()) { - Ok(BlockProcessingOutcome::ParentUnknown { parent: _ }) => { - // need to keep looking for parents - completed_request.downloaded_blocks.push(block); - completed_request.state = BlockRequestsState::Queued; - re_run = true; - break; - } - Ok(BlockProcessingOutcome::Processed { block_root: _ }) => {} - Ok(outcome) => { - // it's a future slot or an invalid block, remove it and try again - completed_request.failed_attempts += 1; - trace!( - self.log, "Invalid parent block"; - "outcome" => format!("{:?}", outcome), - "peer" => format!("{:?}", completed_request.last_submitted_peer), - ); - completed_request.state = BlockRequestsState::Queued; - re_run = true; - return ( - re_run, - Some(ImportManagerOutcome::DownvotePeer( + + // check if the chain exists + if let Some(chain) = self.chain.upgrade() { + match chain.process_block(block.clone()) { + Ok(BlockProcessingOutcome::ParentUnknown { parent: _ }) => { + // need to keep looking for parents + completed_request.downloaded_blocks.push(block); + completed_request.state = BlockRequestsState::Queued; + re_run_poll = true; + break; + } + Ok(BlockProcessingOutcome::Processed { block_root: _ }) => {} + Ok(outcome) => { + // it's a future slot or an invalid block, remove it and try again + completed_request.failed_attempts += 1; + trace!( + self.log, "Invalid parent block"; + "outcome" => format!("{:?}", outcome), + "peer" => format!("{:?}", completed_request.last_submitted_peer), + ); + completed_request.state = BlockRequestsState::Queued; + re_run_poll = true; + downvote_peer( + &mut self.network, + &self.log, completed_request.last_submitted_peer.clone(), - )), - ); - } - Err(e) => { - completed_request.failed_attempts += 1; - warn!( - self.log, "Parent processing error"; - "error" => format!("{:?}", e) - ); - completed_request.state = BlockRequestsState::Queued; - re_run = true; - return ( - re_run, - Some(ImportManagerOutcome::DownvotePeer( + ); + return re_run_poll; + } + Err(e) => { + completed_request.failed_attempts += 1; + warn!( + self.log, "Parent processing error"; + "error" => format!("{:?}", e) + ); + completed_request.state = BlockRequestsState::Queued; + re_run_poll = true; + downvote_peer( + &mut self.network, + &self.log, completed_request.last_submitted_peer.clone(), - )), - ); + ); + return re_run_poll; + } } + } else { + // chain doesn't exist - clear the event queue and return + return false; } } } - // remove any full completed and processed parent chains + // remove any fully processed parent chains self.parent_queue.retain(|req| { - if req.state == BlockRequestsState::Complete { + if req.state == BlockRequestsState::ReadyToProcess { false } else { true } }); - (re_run, None) + re_run_poll } +} - fn process_blocks(&mut self, blocks: Vec>) -> Result<(), String> { - for block in blocks { - let processing_result = self.chain.process_block(block.clone()); +/* Network Context Helper Functions */ + +fn hello_peer( + network: &mut NetworkContext, + log: &slog::Logger, + chain: Weak>, + peer_id: PeerId, +) { + trace!( + log, + "RPC Request"; + "method" => "HELLO", + "peer" => format!("{:?}", peer_id) + ); + if let Some(chain) = chain.upgrade() { + network.send_rpc_request(None, peer_id, RPCRequest::Hello(hello_message(&chain))); + } +} + +fn request_blocks( + network: &mut NetworkContext, + log: &slog::Logger, + peer_id: PeerId, + request_id: RequestId, + request: BeaconBlocksRequest, +) { + trace!( + log, + "RPC Request"; + "method" => "BeaconBlocks", + "id" => request_id, + "count" => request.count, + "peer" => format!("{:?}", peer_id) + ); + network.send_rpc_request( + Some(request_id), + peer_id.clone(), + RPCRequest::BeaconBlocks(request), + ); +} + +fn recent_blocks_request( + network: &mut NetworkContext, + log: &slog::Logger, + peer_id: PeerId, + request_id: RequestId, + request: RecentBeaconBlocksRequest, +) { + trace!( + log, + "RPC Request"; + "method" => "RecentBeaconBlocks", + "count" => request.block_roots.len(), + "peer" => format!("{:?}", peer_id) + ); + network.send_rpc_request( + Some(request_id), + peer_id.clone(), + RPCRequest::RecentBeaconBlocks(request), + ); +} + +fn downvote_peer(network: &mut NetworkContext, log: &slog::Logger, peer_id: PeerId) { + trace!( + log, + "Peer downvoted"; + "peer" => format!("{:?}", peer_id) + ); + // TODO: Implement reputation + network.disconnect(peer_id.clone(), GoodbyeReason::Fault); +} + +// Helper function to process blocks which only consumes the chain and blocks to process +fn process_blocks( + weak_chain: Weak>, + blocks: Vec>, + log: &Logger, +) -> Result<(), String> { + for block in blocks { + if let Some(chain) = weak_chain.upgrade() { + let processing_result = chain.process_block(block.clone()); if let Ok(outcome) = processing_result { match outcome { BlockProcessingOutcome::Processed { block_root } => { // The block was valid and we processed it successfully. trace!( - self.log, "Imported block from network"; + log, "Imported block from network"; "slot" => block.slot, "block_root" => format!("{}", block_root), ); @@ -645,7 +986,7 @@ impl ImportManager { BlockProcessingOutcome::ParentUnknown { parent } => { // blocks should be sequential and all parents should exist trace!( - self.log, "ParentBlockUnknown"; + log, "Parent block is unknown"; "parent_root" => format!("{}", parent), "baby_block_slot" => block.slot, ); @@ -654,6 +995,13 @@ impl ImportManager { block.slot )); } + BlockProcessingOutcome::BlockIsAlreadyKnown => { + // this block is already known to us, move to the next + debug!( + log, "Imported a block that is already known"; + "block_slot" => block.slot, + ); + } BlockProcessingOutcome::FutureSlot { present_slot, block_slot, @@ -661,7 +1009,7 @@ impl ImportManager { if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { // The block is too far in the future, drop it. trace!( - self.log, "FutureBlock"; + log, "Block is ahead of our slot clock"; "msg" => "block for future slot rejected, check your time", "present_slot" => present_slot, "block_slot" => block_slot, @@ -674,8 +1022,7 @@ impl ImportManager { } else { // The block is in the future, but not too far. trace!( - self.log, "QueuedFutureBlock"; - "msg" => "queuing future block, check your time", + log, "Block is slightly ahead of our slot clock, ignoring."; "present_slot" => present_slot, "block_slot" => block_slot, "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, @@ -684,20 +1031,20 @@ impl ImportManager { } BlockProcessingOutcome::WouldRevertFinalizedSlot { .. } => { trace!( - self.log, "Finalized or earlier block processed"; + log, "Finalized or earlier block processed"; "outcome" => format!("{:?}", outcome), ); // block reached our finalized slot or was earlier, move to the next block } BlockProcessingOutcome::GenesisBlock => { trace!( - self.log, "Genesis block was processed"; + log, "Genesis block was processed"; "outcome" => format!("{:?}", outcome), ); } _ => { - trace!( - self.log, "InvalidBlock"; + warn!( + log, "Invalid block received"; "msg" => "peer sent invalid block", "outcome" => format!("{:?}", outcome), ); @@ -705,8 +1052,8 @@ impl ImportManager { } } } else { - trace!( - self.log, "BlockProcessingFailure"; + warn!( + log, "BlockProcessingFailure"; "msg" => "unexpected condition in processing block.", "outcome" => format!("{:?}", processing_result) ); @@ -715,17 +1062,96 @@ impl ImportManager { processing_result )); } + } else { + return Ok(()); // terminate early due to dropped beacon chain } - Ok(()) } + + Ok(()) } -fn root_at_slot( - chain: &Arc>, - target_slot: Slot, -) -> Option { - chain - .rev_iter_block_roots() - .find(|(_root, slot)| *slot == target_slot) - .map(|(root, _slot)| root) +impl Future for SyncManager { + type Item = (); + type Error = String; + + fn poll(&mut self) -> Result, Self::Error> { + // process any inbound messages + loop { + match self.input_channel.poll() { + Ok(Async::Ready(Some(message))) => match message { + SyncMessage::AddPeer(peer_id, info) => { + self.add_peer(peer_id, info); + } + SyncMessage::BeaconBlocksResponse { + peer_id, + request_id, + beacon_blocks, + } => { + self.beacon_blocks_response(peer_id, request_id, beacon_blocks); + } + SyncMessage::RecentBeaconBlocksResponse { + peer_id, + request_id, + beacon_blocks, + } => { + self.recent_blocks_response(peer_id, request_id, beacon_blocks); + } + SyncMessage::UnknownBlock(peer_id, block) => { + self.add_unknown_block(peer_id, block); + } + SyncMessage::Disconnect(peer_id) => { + self.peer_disconnect(&peer_id); + } + SyncMessage::_RPCError(request_id) => { + self.inject_error(request_id); + } + }, + Ok(Async::NotReady) => break, + Ok(Async::Ready(None)) => { + return Err("Sync manager channel closed".into()); + } + Err(e) => { + return Err(format!("Sync Manager channel error: {:?}", e)); + } + } + } + + loop { + //TODO: Optimize the lookups. Potentially keep state of whether each of these functions + //need to be called. + let mut re_run = false; + + // only process batch requests if there are any + if !self.import_queue.is_empty() { + // process potential block requests + self.process_potential_block_requests(); + + // process any complete long-range batches + re_run = re_run || self.process_complete_batches(); + } + + // only process parent objects if we are in regular sync + if !self.parent_queue.is_empty() { + // process any parent block lookup-requests + self.process_parent_requests(); + + // process any complete parent lookups + re_run = re_run || self.process_complete_parent_requests(); + } + + // Shutdown the thread if the chain has termined + if let None = self.chain.upgrade() { + return Ok(Async::Ready(())); + } + + if !re_run { + break; + } + } + + // update the state of the manager + self.update_state(); + + return Ok(Async::NotReady); + } } diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index b26d78c14..58ec386aa 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -4,7 +4,7 @@ mod manager; /// Stores the various syncing methods for the beacon chain. mod simple_sync; -pub use simple_sync::SimpleSync; +pub use simple_sync::MessageProcessor; /// Currently implemented sync methods. pub enum SyncMethod { diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 6745ceb62..c54c481c7 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,23 +1,23 @@ -use super::manager::{ImportManager, ImportManagerOutcome}; -use crate::service::{NetworkMessage, OutgoingMessage}; +use super::manager::SyncMessage; +use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; use slog::{debug, info, o, trace, warn}; use ssz::Encode; -use std::ops::Sub; use std::sync::Arc; use store::Store; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use types::{Attestation, BeaconBlock, Epoch, EthSpec, Hash256, Slot}; +//TODO: Put a maximum limit on the number of block that can be requested. +//TODO: Rate limit requests + /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// Otherwise we queue it. pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; -/// The number of slots behind our head that we still treat a peer as a fully synced peer. -const FULL_PEER_TOLERANCE: u64 = 10; const SHOULD_FORWARD_GOSSIP_BLOCK: bool = true; const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false; @@ -49,45 +49,63 @@ impl From<&Arc>> for PeerSyncInfo { } } -/// The current syncing state. -#[derive(PartialEq)] -pub enum SyncState { - _Idle, - _Downloading, - _Stopped, -} - -/// Simple Syncing protocol. -pub struct SimpleSync { +/// Processes validated messages from the network. It relays necessary data to the syncing thread +/// and processes blocks from the pubsub network. +pub struct MessageProcessor { /// A reference to the underlying beacon chain. chain: Arc>, - manager: ImportManager, + /// A channel to the syncing thread. + sync_send: mpsc::UnboundedSender>, + /// A oneshot channel for destroying the sync thread. + _sync_exit: oneshot::Sender<()>, + /// A nextwork context to return and handle RPC requests. network: NetworkContext, + /// The `RPCHandler` logger. log: slog::Logger, } -impl SimpleSync { - /// Instantiate a `SimpleSync` instance, with no peers and an empty queue. +impl MessageProcessor { + /// Instantiate a `MessageProcessor` instance pub fn new( + executor: &tokio::runtime::TaskExecutor, beacon_chain: Arc>, network_send: mpsc::UnboundedSender, log: &slog::Logger, ) -> Self { let sync_logger = log.new(o!("Service"=> "Sync")); + let sync_network_context = NetworkContext::new(network_send.clone(), sync_logger.clone()); - SimpleSync { - chain: beacon_chain.clone(), - manager: ImportManager::new(beacon_chain, log), + // spawn the sync thread + let (sync_send, _sync_exit) = super::manager::spawn( + executor, + Arc::downgrade(&beacon_chain), + sync_network_context, + sync_logger, + ); + + MessageProcessor { + chain: beacon_chain, + sync_send, + _sync_exit, network: NetworkContext::new(network_send, log.clone()), - log: sync_logger, + log: log.clone(), } } + fn send_to_sync(&mut self, message: SyncMessage) { + self.sync_send.try_send(message).unwrap_or_else(|_| { + warn!( + self.log, + "Could not send message to the sync service"; + ) + }); + } + /// Handle a peer disconnect. /// /// Removes the peer from the manager. pub fn on_disconnect(&mut self, peer_id: PeerId) { - self.manager.peer_disconnect(&peer_id); + self.send_to_sync(SyncMessage::Disconnect(peer_id)); } /// Handle the connection of a new peer. @@ -107,6 +125,7 @@ impl SimpleSync { request_id: RequestId, hello: HelloMessage, ) { + // ignore hello responses if we are shutting down trace!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); // Say hello back. @@ -149,7 +168,7 @@ impl SimpleSync { } else if remote.finalized_epoch <= local.finalized_epoch && remote.finalized_root != Hash256::zero() && local.finalized_root != Hash256::zero() - && (self.root_at_slot(start_slot(remote.finalized_epoch)) + && (self.chain.root_at_slot(start_slot(remote.finalized_epoch)) != Some(remote.finalized_root)) { // The remotes finalized epoch is less than or greater than ours, but the block root is @@ -189,18 +208,16 @@ impl SimpleSync { .exists::>(&remote.head_root) .unwrap_or_else(|_| false) { + trace!( + self.log, "Peer with known chain found"; + "peer" => format!("{:?}", peer_id), + "remote_head_slot" => remote.head_slot, + "remote_latest_finalized_epoch" => remote.finalized_epoch, + ); + // If the node's best-block is already known to us and they are close to our current // head, treat them as a fully sync'd peer. - if self.chain.best_slot().sub(remote.head_slot).as_u64() < FULL_PEER_TOLERANCE { - self.manager.add_full_peer(peer_id); - self.process_sync(); - } else { - debug!( - self.log, - "Out of sync peer connected"; - "peer" => format!("{:?}", peer_id), - ); - } + self.send_to_sync(SyncMessage::AddPeer(peer_id, remote)); } else { // The remote node has an equal or great finalized epoch and we don't know it's head. // @@ -212,87 +229,10 @@ impl SimpleSync { "local_finalized_epoch" => local.finalized_epoch, "remote_latest_finalized_epoch" => remote.finalized_epoch, ); - - self.manager.add_peer(peer_id, remote); - self.process_sync(); + self.send_to_sync(SyncMessage::AddPeer(peer_id, remote)); } } - fn process_sync(&mut self) { - loop { - match self.manager.poll() { - ImportManagerOutcome::Hello(peer_id) => { - trace!( - self.log, - "RPC Request"; - "method" => "HELLO", - "peer" => format!("{:?}", peer_id) - ); - self.network.send_rpc_request( - None, - peer_id, - RPCRequest::Hello(hello_message(&self.chain)), - ); - } - ImportManagerOutcome::RequestBlocks { - peer_id, - request_id, - request, - } => { - trace!( - self.log, - "RPC Request"; - "method" => "BeaconBlocks", - "id" => request_id, - "count" => request.count, - "peer" => format!("{:?}", peer_id) - ); - self.network.send_rpc_request( - Some(request_id), - peer_id.clone(), - RPCRequest::BeaconBlocks(request), - ); - } - ImportManagerOutcome::RecentRequest(peer_id, req) => { - trace!( - self.log, - "RPC Request"; - "method" => "RecentBeaconBlocks", - "count" => req.block_roots.len(), - "peer" => format!("{:?}", peer_id) - ); - self.network.send_rpc_request( - None, - peer_id.clone(), - RPCRequest::RecentBeaconBlocks(req), - ); - } - ImportManagerOutcome::DownvotePeer(peer_id) => { - trace!( - self.log, - "Peer downvoted"; - "peer" => format!("{:?}", peer_id) - ); - // TODO: Implement reputation - self.network - .disconnect(peer_id.clone(), GoodbyeReason::Fault); - } - ImportManagerOutcome::Idle => { - // nothing to do - return; - } - } - } - } - - //TODO: Move to beacon chain - fn root_at_slot(&self, target_slot: Slot) -> Option { - self.chain - .rev_iter_block_roots() - .find(|(_root, slot)| *slot == target_slot) - .map(|(root, _slot)| root) - } - /// Handle a `RecentBeaconBlocks` request from the peer. pub fn on_recent_beacon_blocks_request( &mut self, @@ -321,7 +261,7 @@ impl SimpleSync { debug!( self.log, - "BlockBodiesRequest"; + "RecentBeaconBlocksRequest"; "peer" => format!("{:?}", peer_id), "requested" => request.block_roots.len(), "returned" => blocks.len(), @@ -380,18 +320,16 @@ impl SimpleSync { blocks.reverse(); blocks.dedup_by_key(|brs| brs.slot); - if blocks.len() as u64 != req.count { - debug!( - self.log, - "BeaconBlocksRequest response"; - "peer" => format!("{:?}", peer_id), - "msg" => "Failed to return all requested hashes", - "start_slot" => req.start_slot, - "current_slot" => format!("{:?}", self.chain.slot()), - "requested" => req.count, - "returned" => blocks.len(), - ); - } + debug!( + self.log, + "BeaconBlocksRequest response"; + "peer" => format!("{:?}", peer_id), + "msg" => "Failed to return all requested hashes", + "start_slot" => req.start_slot, + "current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), + "requested" => req.count, + "returned" => blocks.len(), + ); self.network.send_rpc_response( peer_id, @@ -414,10 +352,11 @@ impl SimpleSync { "count" => beacon_blocks.len(), ); - self.manager - .beacon_blocks_response(peer_id, request_id, beacon_blocks); - - self.process_sync(); + self.send_to_sync(SyncMessage::BeaconBlocksResponse { + peer_id, + request_id, + beacon_blocks, + }); } /// Handle a `RecentBeaconBlocks` response from the peer. @@ -429,15 +368,16 @@ impl SimpleSync { ) { debug!( self.log, - "BeaconBlocksResponse"; + "RecentBeaconBlocksResponse"; "peer" => format!("{:?}", peer_id), "count" => beacon_blocks.len(), ); - self.manager - .recent_blocks_response(peer_id, request_id, beacon_blocks); - - self.process_sync(); + self.send_to_sync(SyncMessage::RecentBeaconBlocksResponse { + peer_id, + request_id, + beacon_blocks, + }); } /// Process a gossip message declaring a new block. @@ -455,9 +395,9 @@ impl SimpleSync { } BlockProcessingOutcome::ParentUnknown { parent: _ } => { // Inform the sync manager to find parents for this block - trace!(self.log, "Unknown parent gossip"; + trace!(self.log, "Block with unknown parent received"; "peer_id" => format!("{:?}",peer_id)); - self.manager.add_unknown_block(block.clone(), peer_id); + self.send_to_sync(SyncMessage::UnknownBlock(peer_id, block.clone())); SHOULD_FORWARD_GOSSIP_BLOCK } BlockProcessingOutcome::FutureSlot { @@ -468,7 +408,7 @@ impl SimpleSync { SHOULD_FORWARD_GOSSIP_BLOCK } BlockProcessingOutcome::BlockIsAlreadyKnown => SHOULD_FORWARD_GOSSIP_BLOCK, - _ => SHOULD_NOT_FORWARD_GOSSIP_BLOCK, + _ => SHOULD_NOT_FORWARD_GOSSIP_BLOCK, //TODO: Decide if we want to forward these } } else { SHOULD_NOT_FORWARD_GOSSIP_BLOCK @@ -491,15 +431,10 @@ impl SimpleSync { } } } - - /// Generates our current state in the form of a HELLO RPC message. - pub fn generate_hello(&self) -> HelloMessage { - hello_message(&self.chain) - } } /// Build a `HelloMessage` representing the state of the given `beacon_chain`. -fn hello_message(beacon_chain: &BeaconChain) -> HelloMessage { +pub(crate) fn hello_message(beacon_chain: &BeaconChain) -> HelloMessage { let state = &beacon_chain.head().beacon_state; HelloMessage { @@ -527,7 +462,7 @@ impl NetworkContext { pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) { warn!( &self.log, - "Disconnecting peer"; + "Disconnecting peer (RPC)"; "reason" => format!("{:?}", reason), "peer_id" => format!("{:?}", peer_id), ); @@ -560,12 +495,8 @@ impl NetworkContext { } fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { - self.send(peer_id, OutgoingMessage::RPC(rpc_event)) - } - - fn send(&mut self, peer_id: PeerId, outgoing_message: OutgoingMessage) { self.network_send - .try_send(NetworkMessage::Send(peer_id, outgoing_message)) + .try_send(NetworkMessage::RPC(peer_id, rpc_event)) .unwrap_or_else(|_| { warn!( self.log, diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index 6b0211662..a21f1831e 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -2,9 +2,7 @@ use crate::{ApiError, ApiResult}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use bls::PublicKey; use hex; -use hyper::{Body, Request, StatusCode}; -use serde::de::value::StringDeserializer; -use serde_json::Deserializer; +use hyper::{Body, Request}; use store::{iter::AncestorIter, Store}; use types::{BeaconState, EthSpec, Hash256, RelativeEpoch, Slot}; diff --git a/beacon_node/rest_api/src/url_query.rs b/beacon_node/rest_api/src/url_query.rs index e39a9a449..3802ff831 100644 --- a/beacon_node/rest_api/src/url_query.rs +++ b/beacon_node/rest_api/src/url_query.rs @@ -64,7 +64,7 @@ impl<'a> UrlQuery<'a> { /// Returns a vector of all values present where `key` is in `keys /// /// If no match is found, an `InvalidQueryParams` error is returned. - pub fn all_of(mut self, key: &str) -> Result, ApiError> { + pub fn all_of(self, key: &str) -> Result, ApiError> { let queries: Vec<_> = self .0 .filter_map(|(k, v)| { diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index 365b7e552..0440a7368 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -5,9 +5,8 @@ use bls::PublicKey; use hyper::{Body, Request}; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use store::Store; use types::beacon_state::EthSpec; -use types::{BeaconBlock, BeaconState, Epoch, RelativeEpoch, Shard, Slot}; +use types::{Epoch, RelativeEpoch, Shard, Slot}; #[derive(Debug, Serialize, Deserialize)] pub struct ValidatorDuty { @@ -61,7 +60,7 @@ pub fn get_validator_duties(req: Request) - )) })?; //TODO: Handle an array of validators, currently only takes one - let mut validators: Vec = match query.all_of("validator_pubkeys") { + let validators: Vec = match query.all_of("validator_pubkeys") { Ok(v) => v .iter() .map(|pk| parse_pubkey(pk)) diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index fab75ea4e..5d2388785 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -33,14 +33,14 @@ fn main() { .arg( Arg::with_name("logfile") .long("logfile") - .value_name("logfile") + .value_name("FILE") .help("File path where output will be written.") .takes_value(true), ) .arg( Arg::with_name("network-dir") .long("network-dir") - .value_name("NETWORK-DIR") + .value_name("DIR") .help("Data directory for network keys.") .takes_value(true) .global(true) @@ -83,7 +83,7 @@ fn main() { Arg::with_name("boot-nodes") .long("boot-nodes") .allow_hyphen_values(true) - .value_name("BOOTNODES") + .value_name("ENR-LIST") .help("One or more comma-delimited base64-encoded ENR's to bootstrap the p2p network.") .takes_value(true), ) @@ -128,13 +128,14 @@ fn main() { .arg( Arg::with_name("rpc-address") .long("rpc-address") - .value_name("Address") + .value_name("ADDRESS") .help("Listen address for RPC endpoint.") .takes_value(true), ) .arg( Arg::with_name("rpc-port") .long("rpc-port") + .value_name("PORT") .help("Listen port for RPC endpoint.") .conflicts_with("port-bump") .takes_value(true), @@ -149,14 +150,14 @@ fn main() { .arg( Arg::with_name("api-address") .long("api-address") - .value_name("APIADDRESS") + .value_name("ADDRESS") .help("Set the listen address for the RESTful HTTP API server.") .takes_value(true), ) .arg( Arg::with_name("api-port") .long("api-port") - .value_name("APIPORT") + .value_name("PORT") .help("Set the listen TCP port for the RESTful HTTP API server.") .conflicts_with("port-bump") .takes_value(true), @@ -196,13 +197,6 @@ fn main() { .possible_values(&["info", "debug", "trace", "warn", "error", "crit"]) .default_value("trace"), ) - .arg( - Arg::with_name("verbosity") - .short("v") - .multiple(true) - .help("Sets the verbosity level") - .takes_value(true), - ) /* * The "testnet" sub-command. *