From c0bc45f1f3f35ce66cfa6383745ffb322f7b615b Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 19 Mar 2019 18:28:42 +1100 Subject: [PATCH] Implement node connection validation structure --- beacon_node/network/src/message_handler.rs | 59 ++++++++++++++------- beacon_node/network/src/sync/simple_sync.rs | 42 +++++++++++++-- 2 files changed, 79 insertions(+), 22 deletions(-) diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index f1a114db1..17e74fda2 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -32,9 +32,6 @@ pub struct MessageHandler { network_send: crossbeam_channel::Sender, /// A mapping of peers and the RPC id we have sent an RPC request to. requests: HashMap<(PeerId, u64), Instant>, - /// A mapping of HELLO requests we have sent. We drop/ban peers if they do not response - /// within the timeout - hello_requests: HashMap, /// A counter of request id for each peer. request_ids: HashMap, /// The `MessageHandler` logger. @@ -76,7 +73,6 @@ impl MessageHandler { sync, network_send, requests: HashMap::new(), - hello_requests: HashMap::new(), request_ids: HashMap::new(), log: log.clone(), @@ -99,7 +95,8 @@ impl MessageHandler { match message { // we have initiated a connection to a peer HandlerMessage::PeerDialed(peer_id) => { - self.send_hello_request(peer_id); + let id = self.generate_request_id(&peer_id); + self.send_hello(peer_id, id, true); } // we have received an RPC message request/response HandlerMessage::RPC(peer_id, rpc_event) => { @@ -129,28 +126,41 @@ impl MessageHandler { fn handle_rpc_request(&mut self, peer_id: PeerId, id: u64, request: RPCRequest) { match request { RPCRequest::Hello(hello_message) => { - // self.handle_hello_request(peer_id, id, hello_message) + self.handle_hello_request(peer_id, id, hello_message) } } } /// An RPC response has been received from the network. // we match on id and ignore responses past the timeout. - fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) {} - - fn handle_hello_response(&mut self, peer_id: PeerId, id: u64, response: HelloMessage) { - if self.hello_requests.remove(&peer_id).is_none() { - // if response id is not in our list, ignore (likely RPC timeout) + fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) { + // if response id is related to a request, ignore (likely RPC timeout) + if self.requests.remove(&(peer_id, id)).is_none() { return; } + } + fn handle_hello_request(&mut self, peer_id: PeerId, id: u64, hello_message: HelloMessage) { + // send back a HELLO message + self.send_hello(peer_id.clone(), id, false); + // validate the peer + if !self.sync.validate_peer(peer_id.clone(), hello_message) { + debug!( + self.log, + "Peer dropped due to mismatching HELLO messages: {:?}", peer_id + ); + //TODO: block/ban the peer + } + } + + fn handle_hello_response(&mut self, peer_id: PeerId, id: u64, response: HelloMessage) { debug!(self.log, "Hello response received from peer: {:?}", peer_id); // validate peer - decide whether to drop/ban or add to sync // TODO: Peer validation } - /// Sends a HELLO RPC request to a newly connected peer. - fn send_hello_request(&mut self, peer_id: PeerId) { + /// Generates a new request id for a peer. + fn generate_request_id(&mut self, peer_id: &PeerId) -> u64 { // generate a unique id for the peer let id = { let borrowed_id = self.request_ids.entry(peer_id.clone()).or_insert_with(|| 0); @@ -159,18 +169,29 @@ impl MessageHandler { *borrowed_id += 1; id }; - // register RPC Hello request + // register RPC request self.requests.insert((peer_id.clone(), id), Instant::now()); debug!( self.log, "Hello request registered with peer: {:?}", peer_id ); + id + } - // build the rpc request - let rpc_event = RPCEvent::Request { - id, - method_id: RPCMethod::Hello.into(), - body: RPCRequest::Hello(self.sync.generate_hello()), + /// Sends a HELLO RPC request or response to a newly connected peer. + //TODO: The boolean determines if sending request/respond, will be cleaner in the RPC re-write + fn send_hello(&mut self, peer_id: PeerId, id: u64, request: bool) { + let rpc_event = match request { + true => RPCEvent::Request { + id, + method_id: RPCMethod::Hello.into(), + body: RPCRequest::Hello(self.sync.generate_hello()), + }, + false => RPCEvent::Response { + id, + method_id: RPCMethod::Hello.into(), + result: RPCResponse::Hello(self.sync.generate_hello()), + }, }; // send the hello request to the network diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 336f225b2..2a3cc7089 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -32,15 +32,22 @@ pub struct SimpleSync { state: SyncState, /// The network id, for quick HELLO RPC message lookup. network_id: u8, + /// The latest epoch of the syncing chain. + latest_finalized_epoch: Epoch, + /// The latest block of the syncing chain. + latest_block: Hash256, } impl SimpleSync { pub fn new(beacon_chain: Arc) -> Self { + let state = beacon_chain.get_state(); SimpleSync { + chain: beacon_chain.clone(), known_peers: HashMap::new(), state: SyncState::Idle, network_id: beacon_chain.get_spec().network_id, - chain: beacon_chain, + latest_finalized_epoch: state.finalized_epoch, + latest_block: state.finalized_root, //TODO: Build latest block function into Beacon chain and correct this } } @@ -52,8 +59,37 @@ impl SimpleSync { network_id: self.network_id, latest_finalized_root: state.finalized_root.clone(), latest_finalized_epoch: state.finalized_epoch, - best_root: state.latest_block_roots[0], // 0 or len of vec? - best_slot: state.slot, + best_root: state.latest_block_roots[0], //TODO: build correct value as a beacon chain function + best_slot: state.slot - 1, } } + + pub fn validate_peer(&mut self, peer_id: PeerId, hello_message: HelloMessage) -> bool { + // network id must match + if hello_message.network_id != self.network_id { + return false; + } + // compare latest epoch and finalized root to see if they exist in our chain + if hello_message.latest_finalized_epoch <= self.latest_finalized_epoch { + // ensure their finalized root is in our chain + // TODO: Get the finalized root at hello_message.latest_epoch and ensure they match + //if (hello_message.latest_finalized_root == self.chain.get_state() { + // return false; + // } + } + + // the client is valid, add it to our list of known_peers and request sync if required + // update peer list if peer already exists + let peer_info = PeerSyncInfo { + latest_finalized_root: hello_message.latest_finalized_root, + latest_finalized_epoch: hello_message.latest_finalized_epoch, + best_root: hello_message.best_root, + best_slot: hello_message.best_slot, + }; + + self.known_peers.insert(peer_id, peer_info); + //TODO: Start syncing + + true + } }