From ca18d4390aff4bfbf78d361be65916de9b21325f Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Thu, 21 Mar 2019 17:17:01 +1100 Subject: [PATCH] Add first changes to syncing logic - Adds testing framework - Breaks out new `NetworkContext` object --- beacon_node/beacon_chain/src/lib.rs | 1 + .../beacon_chain/src/test_utils/mod.rs | 3 + .../testing_beacon_chain_builder.rs | 50 +++++ .../test_harness/src/beacon_chain_harness.rs | 11 +- beacon_node/libp2p/Cargo.toml | 1 + beacon_node/libp2p/src/rpc/methods.rs | 28 +++ beacon_node/network/Cargo.toml | 4 + beacon_node/network/src/beacon_chain.rs | 41 ++++ beacon_node/network/src/lib.rs | 4 +- beacon_node/network/src/message_handler.rs | 151 +++++++------- beacon_node/network/src/service.rs | 1 + beacon_node/network/src/sync/simple_sync.rs | 115 ++++++++++- beacon_node/network/tests/tests.rs | 184 ++++++++++++++++++ .../testing_beacon_state_builder.rs | 1 + 14 files changed, 513 insertions(+), 82 deletions(-) create mode 100644 beacon_node/beacon_chain/src/test_utils/mod.rs create mode 100644 beacon_node/beacon_chain/src/test_utils/testing_beacon_chain_builder.rs create mode 100644 beacon_node/network/tests/tests.rs diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 2137c0edf..5fa7e7a77 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -3,6 +3,7 @@ mod beacon_chain; mod checkpoint; mod errors; pub mod initialise; +pub mod test_utils; pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock}; pub use self::checkpoint::CheckPoint; diff --git a/beacon_node/beacon_chain/src/test_utils/mod.rs b/beacon_node/beacon_chain/src/test_utils/mod.rs new file mode 100644 index 000000000..ad251a3c9 --- /dev/null +++ b/beacon_node/beacon_chain/src/test_utils/mod.rs @@ -0,0 +1,3 @@ +mod testing_beacon_chain_builder; + +pub use testing_beacon_chain_builder::TestingBeaconChainBuilder; diff --git a/beacon_node/beacon_chain/src/test_utils/testing_beacon_chain_builder.rs b/beacon_node/beacon_chain/src/test_utils/testing_beacon_chain_builder.rs new file mode 100644 index 000000000..5c5477e55 --- /dev/null +++ b/beacon_node/beacon_chain/src/test_utils/testing_beacon_chain_builder.rs @@ -0,0 +1,50 @@ +pub use crate::{BeaconChain, BeaconChainError, CheckPoint}; +use db::{ + stores::{BeaconBlockStore, BeaconStateStore}, + MemoryDB, +}; +use fork_choice::BitwiseLMDGhost; +use slot_clock::TestingSlotClock; +use ssz::TreeHash; +use std::sync::Arc; +use types::test_utils::TestingBeaconStateBuilder; +use types::*; + +type TestingBeaconChain = BeaconChain>; + +pub struct TestingBeaconChainBuilder { + state_builder: TestingBeaconStateBuilder, +} + +impl TestingBeaconChainBuilder { + pub fn build(self, spec: &ChainSpec) -> TestingBeaconChain { + let db = Arc::new(MemoryDB::open()); + let block_store = Arc::new(BeaconBlockStore::new(db.clone())); + let state_store = Arc::new(BeaconStateStore::new(db.clone())); + let slot_clock = TestingSlotClock::new(spec.genesis_slot.as_u64()); + let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); + + let (genesis_state, _keypairs) = self.state_builder.build(); + + let mut genesis_block = BeaconBlock::empty(&spec); + genesis_block.state_root = Hash256::from_slice(&genesis_state.hash_tree_root()); + + // Create the Beacon Chain + BeaconChain::from_genesis( + state_store.clone(), + block_store.clone(), + slot_clock, + genesis_state, + genesis_block, + spec.clone(), + fork_choice, + ) + .unwrap() + } +} + +impl From for TestingBeaconChainBuilder { + fn from(state_builder: TestingBeaconStateBuilder) -> TestingBeaconChainBuilder { + TestingBeaconChainBuilder { state_builder } + } +} diff --git a/beacon_node/beacon_chain/test_harness/src/beacon_chain_harness.rs b/beacon_node/beacon_chain/test_harness/src/beacon_chain_harness.rs index bc5c93b94..800dd3ce6 100644 --- a/beacon_node/beacon_chain/test_harness/src/beacon_chain_harness.rs +++ b/beacon_node/beacon_chain/test_harness/src/beacon_chain_harness.rs @@ -36,14 +36,21 @@ impl BeaconChainHarness { /// - A keypair, `BlockProducer` and `Attester` for each validator. /// - A new BeaconChain struct where the given validators are in the genesis. pub fn new(spec: ChainSpec, validator_count: usize) -> Self { + let state_builder = + TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(validator_count, &spec); + Self::from_beacon_state_builder(state_builder, spec) + } + + pub fn from_beacon_state_builder( + state_builder: TestingBeaconStateBuilder, + spec: ChainSpec, + ) -> Self { let db = Arc::new(MemoryDB::open()); let block_store = Arc::new(BeaconBlockStore::new(db.clone())); let state_store = Arc::new(BeaconStateStore::new(db.clone())); let slot_clock = TestingSlotClock::new(spec.genesis_slot.as_u64()); let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); - let state_builder = - TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(validator_count, &spec); let (genesis_state, keypairs) = state_builder.build(); let mut genesis_block = BeaconBlock::empty(&spec); diff --git a/beacon_node/libp2p/Cargo.toml b/beacon_node/libp2p/Cargo.toml index dcbc04d0b..e863c4d78 100644 --- a/beacon_node/libp2p/Cargo.toml +++ b/beacon_node/libp2p/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Age Manning "] edition = "2018" [dependencies] +beacon_chain = { path = "../beacon_chain" } # SigP repository until PR is merged libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" } types = { path = "../../eth2/types" } diff --git a/beacon_node/libp2p/src/rpc/methods.rs b/beacon_node/libp2p/src/rpc/methods.rs index 3014afd0f..f05ade7ff 100644 --- a/beacon_node/libp2p/src/rpc/methods.rs +++ b/beacon_node/libp2p/src/rpc/methods.rs @@ -1,3 +1,4 @@ +use beacon_chain::parking_lot::RwLockReadGuard; /// Available RPC methods types and ids. use ssz_derive::{Decode, Encode}; use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; @@ -60,6 +61,20 @@ pub enum RPCRequest { BeaconChainState(BeaconChainStateRequest), } +impl RPCRequest { + pub fn method_id(&self) -> u16 { + let method = match self { + RPCRequest::Hello(_) => RPCMethod::Hello, + RPCRequest::Goodbye(_) => RPCMethod::Goodbye, + RPCRequest::BeaconBlockRoots(_) => RPCMethod::BeaconBlockRoots, + RPCRequest::BeaconBlockHeaders(_) => RPCMethod::BeaconBlockHeaders, + RPCRequest::BeaconBlockBodies(_) => RPCMethod::BeaconBlockBodies, + RPCRequest::BeaconChainState(_) => RPCMethod::BeaconChainState, + }; + method.into() + } +} + #[derive(Debug, Clone)] pub enum RPCResponse { Hello(HelloMessage), @@ -69,6 +84,19 @@ pub enum RPCResponse { BeaconChainState(BeaconChainStateResponse), } +impl RPCResponse { + pub fn method_id(&self) -> u16 { + let method = match self { + RPCResponse::Hello(_) => RPCMethod::Hello, + RPCResponse::BeaconBlockRoots(_) => RPCMethod::BeaconBlockRoots, + RPCResponse::BeaconBlockHeaders(_) => RPCMethod::BeaconBlockHeaders, + RPCResponse::BeaconBlockBodies(_) => RPCMethod::BeaconBlockBodies, + RPCResponse::BeaconChainState(_) => RPCMethod::BeaconChainState, + }; + method.into() + } +} + /* Request/Response data structures for RPC methods */ /// The HELLO request/response handshake message. diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 8b87a9d50..260ee0896 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -4,6 +4,10 @@ version = "0.1.0" authors = ["Age Manning "] edition = "2018" +[dev-dependencies] +test_harness = { path = "../beacon_chain/test_harness" } +sloggers = "0.3.2" + [dependencies] beacon_chain = { path = "../beacon_chain" } libp2p = { path = "../libp2p" } diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs index 91628cc7e..63e1eb6dd 100644 --- a/beacon_node/network/src/beacon_chain.rs +++ b/beacon_node/network/src/beacon_chain.rs @@ -7,6 +7,8 @@ use beacon_chain::{ types::{BeaconState, ChainSpec}, CheckPoint, }; +use libp2p::HelloMessage; +use types::{Epoch, Hash256, Slot}; /// The network's API to the beacon chain. pub trait BeaconChain: Send + Sync { @@ -14,9 +16,19 @@ pub trait BeaconChain: Send + Sync { fn get_state(&self) -> RwLockReadGuard; + fn slot(&self) -> Slot; + fn head(&self) -> RwLockReadGuard; + fn best_slot(&self) -> Slot; + + fn best_block_root(&self) -> Hash256; + fn finalized_head(&self) -> RwLockReadGuard; + + fn finalized_epoch(&self) -> Epoch; + + fn hello_message(&self) -> HelloMessage; } impl BeaconChain for RawBeaconChain @@ -33,11 +45,40 @@ where self.state.read() } + fn slot(&self) -> Slot { + self.get_state().slot + } + fn head(&self) -> RwLockReadGuard { self.head() } + fn finalized_epoch(&self) -> Epoch { + self.get_state().finalized_epoch + } + fn finalized_head(&self) -> RwLockReadGuard { self.finalized_head() } + + fn best_slot(&self) -> Slot { + self.head().beacon_block.slot + } + + fn best_block_root(&self) -> Hash256 { + self.head().beacon_block_root + } + + fn hello_message(&self) -> HelloMessage { + let spec = self.get_spec(); + let state = self.get_state(); + + HelloMessage { + network_id: spec.network_id, + latest_finalized_root: state.finalized_root, + latest_finalized_epoch: state.finalized_epoch, + best_root: self.best_block_root(), + best_slot: self.best_slot(), + } + } } diff --git a/beacon_node/network/src/lib.rs b/beacon_node/network/src/lib.rs index 1e47b9a73..822b05509 100644 --- a/beacon_node/network/src/lib.rs +++ b/beacon_node/network/src/lib.rs @@ -1,8 +1,8 @@ /// This crate provides the network server for Lighthouse. pub mod beacon_chain; pub mod error; -mod message_handler; -mod service; +pub mod message_handler; +pub mod service; pub mod sync; pub use libp2p::NetworkConfig; diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 4cd0ab951..8e390b4af 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -5,32 +5,28 @@ use crate::sync::SimpleSync; use crossbeam_channel::{unbounded as channel, Sender}; use futures::future; use libp2p::{ - rpc::{RPCMethod, RPCRequest, RPCResponse}, + rpc::{RPCRequest, RPCResponse}, HelloMessage, PeerId, RPCEvent, }; +use slog::debug; use slog::warn; -use slog::{debug, trace}; use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; /// Timeout for RPC requests. -const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); +// const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); /// Timeout before banning a peer for non-identification. -const HELLO_TIMEOUT: Duration = Duration::from_secs(30); +// const HELLO_TIMEOUT: Duration = Duration::from_secs(30); /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { /// Currently loaded and initialised beacon chain. - chain: Arc, + _chain: Arc, /// The syncing framework. sync: SimpleSync, - /// The network channel to relay messages to the Network service. - network_send: crossbeam_channel::Sender, - /// A mapping of peers and the RPC id we have sent an RPC request to. - requests: HashMap<(PeerId, u64), Instant>, - /// A counter of request id for each peer. - request_ids: HashMap, + /// The context required to send messages to, and process messages from peers. + network_context: NetworkContext, /// The `MessageHandler` logger. log: slog::Logger, } @@ -65,13 +61,9 @@ impl MessageHandler { let sync = SimpleSync::new(beacon_chain.clone(), &log); let mut handler = MessageHandler { - // TODO: The handler may not need a chain, perhaps only sync? - chain: beacon_chain.clone(), + _chain: beacon_chain.clone(), sync, - network_send, - requests: HashMap::new(), - request_ids: HashMap::new(), - + network_context: NetworkContext::new(network_send, log.clone()), log: log.clone(), }; @@ -93,8 +85,7 @@ impl MessageHandler { match message { // we have initiated a connection to a peer HandlerMessage::PeerDialed(peer_id) => { - let id = self.generate_request_id(&peer_id); - self.send_hello(peer_id, id, true); + self.sync.on_connect(&peer_id, &mut self.network_context); } // we have received an RPC message request/response HandlerMessage::RPC(peer_id, rpc_event) => { @@ -118,9 +109,11 @@ impl MessageHandler { /// A new RPC request has been received from the network. fn handle_rpc_request(&mut self, peer_id: PeerId, id: u64, request: RPCRequest) { + // TODO: ensure the id is legit match request { RPCRequest::Hello(hello_message) => { - self.handle_hello_request(peer_id, id, hello_message) + self.sync + .on_hello(&peer_id, hello_message, &mut self.network_context) } // TODO: Handle all requests _ => {} @@ -131,7 +124,12 @@ impl MessageHandler { // we match on id and ignore responses past the timeout. fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) { // if response id is related to a request, ignore (likely RPC timeout) - if self.requests.remove(&(peer_id.clone(), id)).is_none() { + if self + .network_context + .requests + .remove(&(peer_id.clone(), id)) + .is_none() + { debug!(self.log, "Unrecognized response from peer: {:?}", peer_id); return; } @@ -145,16 +143,10 @@ impl MessageHandler { } } - /// Handle a HELLO RPC request message. - fn handle_hello_request(&mut self, peer_id: PeerId, id: u64, hello_message: HelloMessage) { - // send back a HELLO message - self.send_hello(peer_id.clone(), id, false); - // validate the peer - self.validate_hello(peer_id, hello_message); - } - /// Validate a HELLO RPC message. fn validate_hello(&mut self, peer_id: PeerId, message: HelloMessage) { + self.sync + .on_hello(&peer_id, message.clone(), &mut self.network_context); // validate the peer if !self.sync.validate_peer(peer_id.clone(), message) { debug!( @@ -164,8 +156,68 @@ impl MessageHandler { //TODO: block/ban the peer } } +} - /* General RPC helper functions */ +pub struct NetworkContext { + /// The network channel to relay messages to the Network service. + network_send: crossbeam_channel::Sender, + /// A mapping of peers and the RPC id we have sent an RPC request to. + requests: HashMap<(PeerId, u64), Instant>, + /// A counter of request id for each peer. + request_ids: HashMap, + /// The `MessageHandler` logger. + log: slog::Logger, +} + +impl NetworkContext { + pub fn new(network_send: crossbeam_channel::Sender, log: slog::Logger) -> Self { + Self { + network_send, + requests: HashMap::new(), + request_ids: HashMap::new(), + log, + } + } + + pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) { + let id = self.generate_request_id(&peer_id); + self.send_rpc_event( + peer_id, + RPCEvent::Request { + id, + method_id: rpc_request.method_id(), + body: rpc_request, + }, + ); + } + + pub fn send_rpc_response(&mut self, peer_id: PeerId, rpc_response: RPCResponse) { + let id = self.generate_request_id(&peer_id); + self.send_rpc_event( + peer_id, + RPCEvent::Response { + id, + method_id: rpc_response.method_id(), + result: rpc_response, + }, + ); + } + + fn send_rpc_event(&self, peer_id: PeerId, rpc_event: RPCEvent) { + self.send(peer_id, OutgoingMessage::RPC(rpc_event)) + } + + fn send(&self, peer_id: PeerId, outgoing_message: OutgoingMessage) { + self.network_send + .send(NetworkMessage::Send(peer_id, outgoing_message)) + .unwrap_or_else(|_| { + warn!( + self.log, + "Could not send RPC message to the network service" + ) + }); + // + } /// Generates a new request id for a peer. fn generate_request_id(&mut self, peer_id: &PeerId) -> u64 { @@ -185,41 +237,4 @@ impl MessageHandler { ); id } - - /// Sends a HELLO RPC request or response to a newly connected peer. - //TODO: The boolean determines if sending request/respond, will be cleaner in the RPC re-write - fn send_hello(&mut self, peer_id: PeerId, id: u64, is_request: bool) { - let rpc_event = if is_request { - RPCEvent::Request { - id, - method_id: RPCMethod::Hello.into(), - body: RPCRequest::Hello(self.sync.generate_hello()), - } - } else { - RPCEvent::Response { - id, - method_id: RPCMethod::Hello.into(), - result: RPCResponse::Hello(self.sync.generate_hello()), - } - }; - - // send the hello request to the network - trace!(self.log, "Sending HELLO message to peer {:?}", peer_id); - self.send_rpc(peer_id, rpc_event); - } - - /// Sends an RPC request/response to the network server. - fn send_rpc(&self, peer_id: PeerId, rpc_event: RPCEvent) { - self.network_send - .send(NetworkMessage::Send( - peer_id, - OutgoingMessage::RPC(rpc_event), - )) - .unwrap_or_else(|_| { - warn!( - self.log, - "Could not send RPC message to the network service" - ) - }); - } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index c3045d280..d01188f5d 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -6,6 +6,7 @@ use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; use futures::prelude::*; use futures::sync::oneshot; use futures::Stream; +use libp2p::rpc::RPCResponse; use libp2p::RPCEvent; use libp2p::Service as LibP2PService; use libp2p::{Libp2pEvent, PeerId}; diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 95c7092c3..ae6a9e7a1 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,11 +1,16 @@ use crate::beacon_chain::BeaconChain; -use libp2p::rpc::HelloMessage; +use crate::message_handler::{MessageHandler, NetworkContext}; +use crate::service::NetworkMessage; +use crossbeam_channel::Sender; +use libp2p::rpc::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; use libp2p::PeerId; use slog::{debug, o}; use std::collections::HashMap; use std::sync::Arc; use types::{Epoch, Hash256, Slot}; +type NetworkSender = Sender; + /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. const SLOT_IMPORT_TOLERANCE: u64 = 100; @@ -17,6 +22,32 @@ pub struct PeerSyncInfo { best_slot: Slot, } +impl PeerSyncInfo { + pub fn is_on_chain(&self, chain: &Arc) -> bool { + // TODO: make useful. + true + } + + pub fn has_higher_finalized_epoch(&self, chain: &Arc) -> bool { + self.latest_finalized_epoch > chain.get_state().finalized_epoch + } + + pub fn has_higher_best_slot(&self, chain: &Arc) -> bool { + self.latest_finalized_epoch > chain.get_state().finalized_epoch + } +} + +impl From for PeerSyncInfo { + fn from(hello: HelloMessage) -> PeerSyncInfo { + PeerSyncInfo { + latest_finalized_root: hello.latest_finalized_root, + latest_finalized_epoch: hello.latest_finalized_epoch, + best_root: hello.best_root, + best_slot: hello.best_slot, + } + } +} + /// The current syncing state. #[derive(PartialEq)] pub enum SyncState { @@ -60,17 +91,81 @@ impl SimpleSync { } } + pub fn on_connect(&self, peer_id: &PeerId, network: &mut NetworkContext) { + network.send_rpc_request( + peer_id.clone(), + RPCRequest::Hello(self.chain.hello_message()), + ); + } + + pub fn on_hello_request( + &self, + peer_id: &PeerId, + hello: HelloMessage, + network: &mut NetworkContext, + ) { + network.send_rpc_response( + peer_id.clone(), + RPCResponse::Hello(self.chain.hello_message()), + ); + self.on_hello(peer_id, hello, network); + } + + pub fn on_hello(&self, peer_id: &PeerId, hello: HelloMessage, network: &mut NetworkContext) { + // network id must match + if hello.network_id != self.network_id { + debug!(self.log, "Bad network id. Peer: {:?}", peer_id); + return; + } + + let peer = PeerSyncInfo::from(hello); + + /* + if peer.has_higher_finalized_epoch(&self.chain) { + // we need blocks + let peer_slot = peer.latest_finalized_epoch.start_slot(spec.slots_per_epoch); + let our_slot = self.chain.finalized_epoch(); + let required_slots = peer_slot - our_slot; + } else { + if !peer.is_on_chain(&self.chain) { + return (true, responses); + } + // + } + */ + + /* + // compare latest epoch and finalized root to see if they exist in our chain + if peer_info.latest_finalized_epoch <= self.latest_finalized_epoch { + // ensure their finalized root is in our chain + // TODO: Get the finalized root at hello_message.latest_epoch and ensure they match + //if (hello_message.latest_finalized_root == self.chain.get_state() { + // return false; + // } + } + + // the client is valid, add it to our list of known_peers and request sync if required + // update peer list if peer already exists + let peer_info = PeerSyncInfo::from(hello); + + debug!(self.log, "Handshake successful. Peer: {:?}", peer_id); + self.known_peers.insert(peer_id, peer_info); + + // set state to sync + if self.state == SyncState::Idle + && hello_message.best_slot > self.latest_slot + SLOT_IMPORT_TOLERANCE + { + self.state = SyncState::Downloading; + //TODO: Start requesting blocks from known peers. Ideally in batches + } + + true + */ + } + /// Generates our current state in the form of a HELLO RPC message. pub fn generate_hello(&self) -> HelloMessage { - let state = &self.chain.get_state(); - //TODO: Paul to verify the logic of these fields. - HelloMessage { - network_id: self.network_id, - latest_finalized_root: state.finalized_root, - latest_finalized_epoch: state.finalized_epoch, - best_root: state.latest_block_roots[0], //TODO: build correct value as a beacon chain function - best_slot: state.slot - 1, - } + self.chain.hello_message() } pub fn validate_peer(&mut self, peer_id: PeerId, hello_message: HelloMessage) -> bool { diff --git a/beacon_node/network/tests/tests.rs b/beacon_node/network/tests/tests.rs new file mode 100644 index 000000000..dc0832fed --- /dev/null +++ b/beacon_node/network/tests/tests.rs @@ -0,0 +1,184 @@ +use beacon_chain::test_utils::TestingBeaconChainBuilder; +use crossbeam_channel::{unbounded, Receiver, Sender}; +use libp2p::rpc::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; +use libp2p::{PeerId, RPCEvent}; +use network::beacon_chain::BeaconChain as NetworkBeaconChain; +use network::message_handler::{HandlerMessage, MessageHandler}; +use network::service::{NetworkMessage, OutgoingMessage}; +use sloggers::terminal::{Destination, TerminalLoggerBuilder}; +use sloggers::types::Severity; +use sloggers::Build; +use std::sync::Arc; +use test_harness::BeaconChainHarness; +use tokio::runtime::TaskExecutor; +use types::{test_utils::TestingBeaconStateBuilder, *}; + +pub struct SyncNode { + pub id: usize, + sender: Sender, + receiver: Receiver, +} + +impl SyncNode { + pub fn new( + id: usize, + executor: &TaskExecutor, + chain: Arc, + logger: slog::Logger, + ) -> Self { + let (network_sender, network_receiver) = unbounded(); + let message_handler_sender = + MessageHandler::spawn(chain, network_sender, executor, logger).unwrap(); + + Self { + id, + sender: message_handler_sender, + receiver: network_receiver, + } + } + + fn send(&self, message: HandlerMessage) { + self.sender.send(message).unwrap(); + } + + fn recv(&self) -> NetworkMessage { + self.receiver.recv().unwrap() + } + + fn recv_rpc_response(&self) -> RPCResponse { + let network_message = self.recv(); + match network_message { + NetworkMessage::Send( + _peer_id, + OutgoingMessage::RPC(RPCEvent::Response { + id: _, + method_id: _, + result, + }), + ) => result, + _ => panic!("get_rpc_response failed! got {:?}", network_message), + } + } + + fn recv_rpc_request(&self) -> RPCRequest { + let network_message = self.recv(); + match network_message { + NetworkMessage::Send( + _peer_id, + OutgoingMessage::RPC(RPCEvent::Request { + id: _, + method_id: _, + body, + }), + ) => body, + _ => panic!("get_rpc_request failed! got {:?}", network_message), + } + } +} + +fn get_logger() -> slog::Logger { + let mut builder = TerminalLoggerBuilder::new(); + builder.level(Severity::Debug); + builder.destination(Destination::Stderr); + builder.build().unwrap() +} + +pub struct SyncMaster { + harness: BeaconChainHarness, + peer_id: PeerId, + response_ids: Vec, +} + +impl SyncMaster { + fn from_beacon_state_builder( + state_builder: TestingBeaconStateBuilder, + node_count: usize, + spec: &ChainSpec, + ) -> Self { + let harness = BeaconChainHarness::from_beacon_state_builder(state_builder, spec.clone()); + let peer_id = PeerId::random(); + let response_ids = vec![0; node_count]; + + Self { + harness, + peer_id, + response_ids, + } + } + + pub fn response_id(&mut self, node: &SyncNode) -> u64 { + let id = self.response_ids[node.id]; + self.response_ids[node.id] += 1; + id + } + + pub fn do_hello_with(&mut self, node: &SyncNode) { + let message = HandlerMessage::PeerDialed(self.peer_id.clone()); + node.send(message); + + let request = node.recv_rpc_request(); + + match request { + RPCRequest::Hello(_hello) => { + let hello = self.harness.beacon_chain.hello_message(); + let response = self.rpc_response(node, RPCResponse::Hello(hello)); + node.send(response); + } + _ => panic!("Got message other than hello from node."), + } + } + + fn rpc_response(&mut self, node: &SyncNode, rpc_response: RPCResponse) -> HandlerMessage { + HandlerMessage::RPC( + self.peer_id.clone(), + RPCEvent::Response { + id: self.response_id(node), + method_id: RPCMethod::Hello.into(), + result: rpc_response, + }, + ) + } +} + +fn test_setup( + state_builder: TestingBeaconStateBuilder, + node_count: usize, + spec: &ChainSpec, + logger: slog::Logger, +) -> (tokio::runtime::Runtime, SyncMaster, Vec) { + let runtime = tokio::runtime::Runtime::new().unwrap(); + + let mut nodes = Vec::with_capacity(node_count); + for id in 0..node_count { + let local_chain = TestingBeaconChainBuilder::from(state_builder.clone()).build(&spec); + let node = SyncNode::new( + id, + &runtime.executor(), + Arc::new(local_chain), + logger.clone(), + ); + + nodes.push(node); + } + + let master = SyncMaster::from_beacon_state_builder(state_builder, node_count, &spec); + + (runtime, master, nodes) +} + +#[test] +fn first_test() { + let logger = get_logger(); + let spec = ChainSpec::few_validators(); + let validator_count = 8; + let node_count = 1; + + let state_builder = + TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(validator_count, &spec); + + let (runtime, mut master, nodes) = test_setup(state_builder, node_count, &spec, logger.clone()); + + master.do_hello_with(&nodes[0]); + + runtime.shutdown_now(); +} diff --git a/eth2/types/src/test_utils/testing_beacon_state_builder.rs b/eth2/types/src/test_utils/testing_beacon_state_builder.rs index e76a01e49..7a7a902de 100644 --- a/eth2/types/src/test_utils/testing_beacon_state_builder.rs +++ b/eth2/types/src/test_utils/testing_beacon_state_builder.rs @@ -23,6 +23,7 @@ pub fn keypairs_path() -> PathBuf { /// Builds a beacon state to be used for testing purposes. /// /// This struct should **never be used for production purposes.** +#[derive(Clone)] pub struct TestingBeaconStateBuilder { state: BeaconState, keypairs: Vec,