From 7ee080db6021b2fb4b47056ce0a666020b71b3d9 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Sun, 25 Aug 2019 08:25:54 +1000 Subject: [PATCH] Updated syncing algorithm --- beacon_node/client/src/lib.rs | 6 ++---- beacon_node/eth2-libp2p/Cargo.toml | 4 ++-- beacon_node/eth2-libp2p/src/behaviour.rs | 3 +-- beacon_node/eth2-libp2p/src/service.rs | 2 +- beacon_node/network/src/service.rs | 2 +- beacon_node/network/src/sync/simple_sync.rs | 8 +++++++- beacon_node/rpc/src/attestation.rs | 8 ++++++-- beacon_node/rpc/src/beacon_block.rs | 10 +++++++--- 8 files changed, 27 insertions(+), 16 deletions(-) diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 7e6449a98..4b64c1070 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -41,7 +41,7 @@ pub struct Client { /// Signal to terminate the slot timer. pub slot_timer_exit_signal: Option, /// Signal to terminate the API - // pub api_exit_signal: Option, + pub api_exit_signal: Option, /// The clients logger. log: slog::Logger, /// Marker to pin the beacon chain generics. @@ -134,7 +134,6 @@ where None }; - /* // Start the `rest_api` service let api_exit_signal = if client_config.rest_api.enabled { match rest_api::start_server( @@ -152,7 +151,6 @@ where } else { None }; - */ let (slot_timer_exit_signal, exit) = exit_future::signal(); if let Ok(Some(duration_to_next_slot)) = beacon_chain.slot_clock.duration_to_next_slot() { @@ -186,7 +184,7 @@ where http_exit_signal, rpc_exit_signal, slot_timer_exit_signal: Some(slot_timer_exit_signal), - //api_exit_signal, + api_exit_signal, log, network, phantom: PhantomData, diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 55081aed5..a379bcead 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -7,8 +7,8 @@ edition = "2018" [dependencies] clap = "2.32.0" #SigP repository -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b0d3cf7b4b0fa6c555b64dbdd110673a05457abd" } -enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "b0d3cf7b4b0fa6c555b64dbdd110673a05457abd", features = ["serde"] } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "a56865a4077ac54767136b4bee627c9734720a6b" } +enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "a56865a4077ac54767136b4bee627c9734720a6b", features = ["serde"] } types = { path = "../../eth2/types" } serde = "1.0" serde_derive = "1.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index b4822de4c..29725e0ce 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -16,7 +16,6 @@ use libp2p::{ NetworkBehaviour, PeerId, }; use slog::{debug, o, trace}; -use ssz::{ssz_encode, Encode}; use std::num::NonZeroU32; use std::time::Duration; @@ -189,7 +188,7 @@ impl Behaviour { } /// Publishes a message on the pubsub (gossipsub) behaviour. - pub fn publish(&mut self, topics: Vec, message: PubsubMessage) { + pub fn publish(&mut self, topics: &[Topic], message: PubsubMessage) { let message_data = message.to_data(); for topic in topics { self.gossipsub.publish(topic, message_data.clone()); diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 98718445b..9945b1586 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -148,7 +148,7 @@ impl Stream for Service { topics, message, } => { - trace!(self.log, "Gossipsub message received"; "Message" => format!("{:?}", message)); + trace!(self.log, "Gossipsub message received"; "service" => "Swarm"); return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage { source, topics, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index df0404cfa..4800a7efb 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -119,7 +119,7 @@ fn network_service( }, NetworkMessage::Publish { topics, message } => { debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); - libp2p_service.lock().swarm.publish(topics, message); + libp2p_service.lock().swarm.publish(&topics, message); } }, Ok(Async::NotReady) => break, diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 924b2de9b..bee9310d3 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -465,9 +465,15 @@ impl SimpleSync { pub fn on_block_gossip(&mut self, peer_id: PeerId, block: BeaconBlock) -> bool { if let Ok(outcome) = self.chain.process_block(block.clone()) { match outcome { - BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK, + BlockProcessingOutcome::Processed { .. } => { + trace!(self.log, "Gossipsub block processed"; + "peer_id" => format!("{:?}",peer_id)); + SHOULD_FORWARD_GOSSIP_BLOCK + } BlockProcessingOutcome::ParentUnknown { parent: _ } => { // Inform the sync manager to find parents for this block + trace!(self.log, "Unknown parent gossip"; + "peer_id" => format!("{:?}",peer_id)); self.manager.add_unknown_block(block.clone(), peer_id); SHOULD_FORWARD_GOSSIP_BLOCK } diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index f442e247d..dff3f8d70 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -1,7 +1,7 @@ use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2_libp2p::PubsubMessage; use eth2_libp2p::Topic; -use eth2_libp2p::BEACON_ATTESTATION_TOPIC; +use eth2_libp2p::{BEACON_ATTESTATION_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX}; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use network::NetworkMessage; @@ -144,7 +144,11 @@ impl AttestationService for AttestationServiceInstance { ); // valid attestation, propagate to the network - let topic = Topic::new(BEACON_ATTESTATION_TOPIC.into()); + let topic_string = format!( + "/{}/{}/{}", + TOPIC_PREFIX, BEACON_ATTESTATION_TOPIC, TOPIC_ENCODING_POSTFIX + ); + let topic = Topic::new(topic_string); let message = PubsubMessage::Attestation(attestation.as_ssz_bytes()); self.network_chan diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index b1a67399e..92a543ef3 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -1,6 +1,6 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; -use eth2_libp2p::BEACON_BLOCK_TOPIC; use eth2_libp2p::{PubsubMessage, Topic}; +use eth2_libp2p::{BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX}; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use network::NetworkMessage; @@ -105,8 +105,12 @@ impl BeaconBlockService for BeaconBlockServiceInstance { "block_root" => format!("{}", block_root), ); - // get the network topic to send on - let topic = Topic::new(BEACON_BLOCK_TOPIC.into()); + // create the network topic to send on + let topic_string = format!( + "/{}/{}/{}", + TOPIC_PREFIX, BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX + ); + let topic = Topic::new(topic_string); let message = PubsubMessage::Block(block.as_ssz_bytes()); // Publish the block to the p2p network via gossipsub.