Pass first sync test

This commit is contained in:
Paul Hauner 2019-03-22 11:39:16 +11:00
parent 4fc6e435d2
commit f96a3282b5
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
2 changed files with 81 additions and 66 deletions

View File

@ -1,7 +1,5 @@
use crate::beacon_chain::BeaconChain; use crate::beacon_chain::BeaconChain;
use crate::message_handler::NetworkContext; use crate::message_handler::NetworkContext;
use crate::service::NetworkMessage;
use crossbeam_channel::Sender;
use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCRequest, RPCResponse}; use eth2_libp2p::rpc::{RPCRequest, RPCResponse};
use eth2_libp2p::PeerId; use eth2_libp2p::PeerId;
@ -10,14 +8,13 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use types::{Epoch, Hash256, Slot}; use types::{Epoch, Hash256, Slot};
type NetworkSender = Sender<NetworkMessage>;
/// The number of slots that we can import blocks ahead of us, before going into full Sync mode. /// The number of slots that we can import blocks ahead of us, before going into full Sync mode.
const SLOT_IMPORT_TOLERANCE: u64 = 100; const SLOT_IMPORT_TOLERANCE: u64 = 100;
/// Keeps track of syncing information for known connected peers. /// Keeps track of syncing information for known connected peers.
#[derive(Clone, Copy)] #[derive(Clone, Copy, Debug)]
pub struct PeerSyncInfo { pub struct PeerSyncInfo {
network_id: u8,
latest_finalized_root: Hash256, latest_finalized_root: Hash256,
latest_finalized_epoch: Epoch, latest_finalized_epoch: Epoch,
best_root: Hash256, best_root: Hash256,
@ -25,25 +22,24 @@ pub struct PeerSyncInfo {
} }
impl PeerSyncInfo { impl PeerSyncInfo {
fn is_on_chain(&self, chain: &Arc<BeaconChain>) -> bool { fn is_on_same_chain(&self, other: Self) -> bool {
// TODO: make useful. self.network_id == other.network_id
true
} }
fn has_higher_finalized_epoch(&self, chain: &Arc<BeaconChain>) -> bool { fn has_higher_finalized_epoch_than(&self, other: Self) -> bool {
self.latest_finalized_epoch > chain.get_state().finalized_epoch self.latest_finalized_epoch > other.latest_finalized_epoch
} }
fn has_higher_best_slot(&self, chain: &Arc<BeaconChain>) -> bool { fn has_higher_best_slot_than(&self, other: Self) -> bool {
self.latest_finalized_epoch > chain.get_state().finalized_epoch self.best_slot > other.best_slot
} }
pub fn status(&self, chain: &Arc<BeaconChain>) -> PeerStatus { pub fn status_compared_to(&self, other: Self) -> PeerStatus {
if self.has_higher_finalized_epoch(chain) { if self.has_higher_finalized_epoch_than(other) {
PeerStatus::HigherFinalizedEpoch PeerStatus::HigherFinalizedEpoch
} else if !self.is_on_chain(chain) { } else if !self.is_on_same_chain(other) {
PeerStatus::HigherFinalizedEpoch PeerStatus::OnDifferentChain
} else if self.has_higher_best_slot(chain) { } else if self.has_higher_best_slot_than(other) {
PeerStatus::HigherBestSlot PeerStatus::HigherBestSlot
} else { } else {
PeerStatus::NotInteresting PeerStatus::NotInteresting
@ -62,6 +58,7 @@ pub enum PeerStatus {
impl From<HelloMessage> for PeerSyncInfo { impl From<HelloMessage> for PeerSyncInfo {
fn from(hello: HelloMessage) -> PeerSyncInfo { fn from(hello: HelloMessage) -> PeerSyncInfo {
PeerSyncInfo { PeerSyncInfo {
network_id: hello.network_id,
latest_finalized_root: hello.latest_finalized_root, latest_finalized_root: hello.latest_finalized_root,
latest_finalized_epoch: hello.latest_finalized_epoch, latest_finalized_epoch: hello.latest_finalized_epoch,
best_root: hello.best_root, best_root: hello.best_root,
@ -70,6 +67,12 @@ impl From<HelloMessage> for PeerSyncInfo {
} }
} }
impl From<&Arc<BeaconChain>> for PeerSyncInfo {
fn from(chain: &Arc<BeaconChain>) -> PeerSyncInfo {
Self::from(chain.hello_message())
}
}
/// The current syncing state. /// The current syncing state.
#[derive(PartialEq)] #[derive(PartialEq)]
pub enum SyncState { pub enum SyncState {
@ -88,12 +91,6 @@ pub struct SimpleSync {
known_peers: HashMap<PeerId, PeerSyncInfo>, known_peers: HashMap<PeerId, PeerSyncInfo>,
/// The current state of the syncing protocol. /// The current state of the syncing protocol.
state: SyncState, state: SyncState,
/// The network id, for quick HELLO RPC message lookup.
network_id: u8,
/// The latest epoch of the syncing chain.
latest_finalized_epoch: Epoch,
/// The latest block of the syncing chain.
latest_slot: Slot,
/// Sync logger. /// Sync logger.
log: slog::Logger, log: slog::Logger,
} }
@ -106,9 +103,6 @@ impl SimpleSync {
chain: beacon_chain.clone(), chain: beacon_chain.clone(),
known_peers: HashMap::new(), known_peers: HashMap::new(),
state: SyncState::Idle, state: SyncState::Idle,
network_id: beacon_chain.get_spec().network_id,
latest_finalized_epoch: state.finalized_epoch,
latest_slot: state.slot - 1, //TODO: Build latest block function into Beacon chain and correct this
log: sync_logger, log: sync_logger,
} }
} }
@ -133,40 +127,39 @@ impl SimpleSync {
pub fn on_hello(&mut self, peer_id: PeerId, hello: HelloMessage, network: &mut NetworkContext) { pub fn on_hello(&mut self, peer_id: PeerId, hello: HelloMessage, network: &mut NetworkContext) {
let spec = self.chain.get_spec(); let spec = self.chain.get_spec();
let remote = PeerSyncInfo::from(hello);
let local = PeerSyncInfo::from(&self.chain);
let remote_status = remote.status_compared_to(local);
// network id must match // network id must match
if hello.network_id != self.network_id { if remote_status != PeerStatus::OnDifferentChain {
debug!(self.log, "Bad network id. Peer: {:?}", peer_id); debug!(self.log, "Handshake successful. Peer: {:?}", peer_id);
network.disconnect(peer_id); self.known_peers.insert(peer_id.clone(), remote);
return;
} }
let peer = PeerSyncInfo::from(hello); match remote_status {
debug!(self.log, "Handshake successful. Peer: {:?}", peer_id);
self.known_peers.insert(peer_id.clone(), peer);
debug!(
self.log,
"Peer hello. Status: {:?}",
peer.status(&self.chain)
);
match peer.status(&self.chain) {
PeerStatus::OnDifferentChain => { PeerStatus::OnDifferentChain => {
debug!(self.log, "Peer is on different chain. Peer: {:?}", peer_id); debug!(self.log, "Peer is on different chain. Peer: {:?}", peer_id);
network.disconnect(peer_id); network.disconnect(peer_id);
} }
PeerStatus::HigherFinalizedEpoch => { PeerStatus::HigherFinalizedEpoch => {
let start_slot = peer.latest_finalized_epoch.start_slot(spec.slots_per_epoch); let start_slot = remote
let required_slots = start_slot - self.chain.slot(); .latest_finalized_epoch
.start_slot(spec.slots_per_epoch);
let required_slots = start_slot - local.best_slot;
self.request_block_roots(peer_id, start_slot, required_slots.as_u64(), network); self.request_block_roots(peer_id, start_slot, required_slots.as_u64(), network);
} }
PeerStatus::HigherBestSlot => { PeerStatus::HigherBestSlot => {
let start_slot = peer.best_slot; let required_slots = remote.best_slot - local.best_slot;
let required_slots = start_slot - self.chain.slot();
self.request_block_roots(peer_id, start_slot, required_slots.as_u64(), network); self.request_block_roots(
peer_id,
local.best_slot,
required_slots.as_u64(),
network,
);
} }
PeerStatus::NotInteresting => {} PeerStatus::NotInteresting => {}
} }

View File

@ -1,4 +1,3 @@
use beacon_chain::test_utils::TestingBeaconChainBuilder;
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCMethod, RPCRequest, RPCResponse}; use eth2_libp2p::rpc::{RPCMethod, RPCRequest, RPCResponse};
@ -9,7 +8,6 @@ use network::service::{NetworkMessage, OutgoingMessage};
use sloggers::terminal::{Destination, TerminalLoggerBuilder}; use sloggers::terminal::{Destination, TerminalLoggerBuilder};
use sloggers::types::Severity; use sloggers::types::Severity;
use sloggers::Build; use sloggers::Build;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use test_harness::BeaconChainHarness; use test_harness::BeaconChainHarness;
use tokio::runtime::TaskExecutor; use tokio::runtime::TaskExecutor;
@ -19,26 +17,40 @@ pub struct SyncNode {
pub id: usize, pub id: usize,
sender: Sender<HandlerMessage>, sender: Sender<HandlerMessage>,
receiver: Receiver<NetworkMessage>, receiver: Receiver<NetworkMessage>,
harness: BeaconChainHarness,
} }
impl SyncNode { impl SyncNode {
pub fn new( fn from_beacon_state_builder(
id: usize, id: usize,
executor: &TaskExecutor, executor: &TaskExecutor,
chain: Arc<NetworkBeaconChain>, state_builder: TestingBeaconStateBuilder,
spec: &ChainSpec,
logger: slog::Logger, logger: slog::Logger,
) -> Self { ) -> Self {
let harness = BeaconChainHarness::from_beacon_state_builder(state_builder, spec.clone());
let (network_sender, network_receiver) = unbounded(); let (network_sender, network_receiver) = unbounded();
let message_handler_sender = let message_handler_sender = MessageHandler::spawn(
MessageHandler::spawn(chain, network_sender, executor, logger).unwrap(); harness.beacon_chain.clone(),
network_sender,
executor,
logger,
)
.unwrap();
Self { Self {
id, id,
sender: message_handler_sender, sender: message_handler_sender,
receiver: network_receiver, receiver: network_receiver,
harness,
} }
} }
fn increment_beacon_chain_slot(&mut self) {
self.harness.increment_beacon_chain_slot();
}
fn send(&self, message: HandlerMessage) { fn send(&self, message: HandlerMessage) {
self.sender.send(message).unwrap(); self.sender.send(message).unwrap();
} }
@ -47,7 +59,11 @@ impl SyncNode {
self.receiver.recv_timeout(Duration::from_millis(500)) self.receiver.recv_timeout(Duration::from_millis(500))
} }
fn recv_rpc_response(&self) -> Result<RPCResponse, RecvTimeoutError> { fn hello_message(&self) -> HelloMessage {
self.harness.beacon_chain.hello_message()
}
fn _recv_rpc_response(&self) -> Result<RPCResponse, RecvTimeoutError> {
let network_message = self.recv()?; let network_message = self.recv()?;
Ok(match network_message { Ok(match network_message {
NetworkMessage::Send( NetworkMessage::Send(
@ -108,12 +124,6 @@ impl SyncMaster {
} }
} }
pub fn build_blocks(&mut self, blocks: usize) {
for _ in 0..blocks {
self.harness.advance_chain_with_block();
}
}
pub fn response_id(&mut self, node: &SyncNode) -> u64 { pub fn response_id(&mut self, node: &SyncNode) -> u64 {
let id = self.response_ids[node.id]; let id = self.response_ids[node.id];
self.response_ids[node.id] += 1; self.response_ids[node.id] += 1;
@ -169,11 +179,11 @@ fn test_setup(
let mut nodes = Vec::with_capacity(node_count); let mut nodes = Vec::with_capacity(node_count);
for id in 0..node_count { for id in 0..node_count {
let local_chain = TestingBeaconChainBuilder::from(state_builder.clone()).build(&spec); let node = SyncNode::from_beacon_state_builder(
let node = SyncNode::new(
id, id,
&runtime.executor(), &runtime.executor(),
Arc::new(local_chain), state_builder.clone(),
&spec,
logger.clone(), logger.clone(),
); );
@ -185,6 +195,15 @@ fn test_setup(
(runtime, master, nodes) (runtime, master, nodes)
} }
pub fn build_blocks(blocks: usize, master: &mut SyncMaster, nodes: &mut Vec<SyncNode>) {
for _ in 0..blocks {
master.harness.advance_chain_with_block();
for i in 0..nodes.len() {
nodes[i].increment_beacon_chain_slot();
}
}
}
#[test] #[test]
fn first_test() { fn first_test() {
let logger = get_logger(); let logger = get_logger();
@ -195,17 +214,20 @@ fn first_test() {
let state_builder = let state_builder =
TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(validator_count, &spec); TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(validator_count, &spec);
let (runtime, mut master, nodes) = test_setup(state_builder, node_count, &spec, logger.clone()); let (runtime, mut master, mut nodes) =
test_setup(state_builder, node_count, &spec, logger.clone());
master.build_blocks(10); let original_node_slot = nodes[0].hello_message().best_slot;
build_blocks(2, &mut master, &mut nodes);
master.do_hello_with(&nodes[0]); master.do_hello_with(&nodes[0]);
assert_sent_block_root_request( assert_sent_block_root_request(
&nodes[0], &nodes[0],
BeaconBlockRootsRequest { BeaconBlockRootsRequest {
start_slot: Slot::new(1), start_slot: original_node_slot,
count: 10, count: 2,
}, },
); );