Updated syncing algorithm

This commit is contained in:
Age Manning 2019-08-25 08:25:54 +10:00
parent 0d56df474a
commit 7ee080db60
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
8 changed files with 27 additions and 16 deletions

View File

@ -41,7 +41,7 @@ pub struct Client<T: BeaconChainTypes> {
/// Signal to terminate the slot timer. /// Signal to terminate the slot timer.
pub slot_timer_exit_signal: Option<Signal>, pub slot_timer_exit_signal: Option<Signal>,
/// Signal to terminate the API /// Signal to terminate the API
// pub api_exit_signal: Option<Signal>, pub api_exit_signal: Option<Signal>,
/// The clients logger. /// The clients logger.
log: slog::Logger, log: slog::Logger,
/// Marker to pin the beacon chain generics. /// Marker to pin the beacon chain generics.
@ -134,7 +134,6 @@ where
None None
}; };
/*
// Start the `rest_api` service // Start the `rest_api` service
let api_exit_signal = if client_config.rest_api.enabled { let api_exit_signal = if client_config.rest_api.enabled {
match rest_api::start_server( match rest_api::start_server(
@ -152,7 +151,6 @@ where
} else { } else {
None None
}; };
*/
let (slot_timer_exit_signal, exit) = exit_future::signal(); let (slot_timer_exit_signal, exit) = exit_future::signal();
if let Ok(Some(duration_to_next_slot)) = beacon_chain.slot_clock.duration_to_next_slot() { if let Ok(Some(duration_to_next_slot)) = beacon_chain.slot_clock.duration_to_next_slot() {
@ -186,7 +184,7 @@ where
http_exit_signal, http_exit_signal,
rpc_exit_signal, rpc_exit_signal,
slot_timer_exit_signal: Some(slot_timer_exit_signal), slot_timer_exit_signal: Some(slot_timer_exit_signal),
//api_exit_signal, api_exit_signal,
log, log,
network, network,
phantom: PhantomData, phantom: PhantomData,

View File

@ -7,8 +7,8 @@ edition = "2018"
[dependencies] [dependencies]
clap = "2.32.0" clap = "2.32.0"
#SigP repository #SigP repository
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b0d3cf7b4b0fa6c555b64dbdd110673a05457abd" } libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "a56865a4077ac54767136b4bee627c9734720a6b" }
enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "b0d3cf7b4b0fa6c555b64dbdd110673a05457abd", features = ["serde"] } enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "a56865a4077ac54767136b4bee627c9734720a6b", features = ["serde"] }
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"

View File

@ -16,7 +16,6 @@ use libp2p::{
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };
use slog::{debug, o, trace}; use slog::{debug, o, trace};
use ssz::{ssz_encode, Encode};
use std::num::NonZeroU32; use std::num::NonZeroU32;
use std::time::Duration; use std::time::Duration;
@ -189,7 +188,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
} }
/// Publishes a message on the pubsub (gossipsub) behaviour. /// Publishes a message on the pubsub (gossipsub) behaviour.
pub fn publish(&mut self, topics: Vec<Topic>, message: PubsubMessage) { pub fn publish(&mut self, topics: &[Topic], message: PubsubMessage) {
let message_data = message.to_data(); let message_data = message.to_data();
for topic in topics { for topic in topics {
self.gossipsub.publish(topic, message_data.clone()); self.gossipsub.publish(topic, message_data.clone());

View File

@ -148,7 +148,7 @@ impl Stream for Service {
topics, topics,
message, message,
} => { } => {
trace!(self.log, "Gossipsub message received"; "Message" => format!("{:?}", message)); trace!(self.log, "Gossipsub message received"; "service" => "Swarm");
return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage { return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage {
source, source,
topics, topics,

View File

@ -119,7 +119,7 @@ fn network_service(
}, },
NetworkMessage::Publish { topics, message } => { NetworkMessage::Publish { topics, message } => {
debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics));
libp2p_service.lock().swarm.publish(topics, message); libp2p_service.lock().swarm.publish(&topics, message);
} }
}, },
Ok(Async::NotReady) => break, Ok(Async::NotReady) => break,

View File

@ -465,9 +465,15 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
pub fn on_block_gossip(&mut self, peer_id: PeerId, block: BeaconBlock<T::EthSpec>) -> bool { pub fn on_block_gossip(&mut self, peer_id: PeerId, block: BeaconBlock<T::EthSpec>) -> bool {
if let Ok(outcome) = self.chain.process_block(block.clone()) { if let Ok(outcome) = self.chain.process_block(block.clone()) {
match outcome { match outcome {
BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK, BlockProcessingOutcome::Processed { .. } => {
trace!(self.log, "Gossipsub block processed";
"peer_id" => format!("{:?}",peer_id));
SHOULD_FORWARD_GOSSIP_BLOCK
}
BlockProcessingOutcome::ParentUnknown { parent: _ } => { BlockProcessingOutcome::ParentUnknown { parent: _ } => {
// Inform the sync manager to find parents for this block // Inform the sync manager to find parents for this block
trace!(self.log, "Unknown parent gossip";
"peer_id" => format!("{:?}",peer_id));
self.manager.add_unknown_block(block.clone(), peer_id); self.manager.add_unknown_block(block.clone(), peer_id);
SHOULD_FORWARD_GOSSIP_BLOCK SHOULD_FORWARD_GOSSIP_BLOCK
} }

View File

@ -1,7 +1,7 @@
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2_libp2p::PubsubMessage; use eth2_libp2p::PubsubMessage;
use eth2_libp2p::Topic; use eth2_libp2p::Topic;
use eth2_libp2p::BEACON_ATTESTATION_TOPIC; use eth2_libp2p::{BEACON_ATTESTATION_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX};
use futures::Future; use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use network::NetworkMessage; use network::NetworkMessage;
@ -144,7 +144,11 @@ impl<T: BeaconChainTypes> AttestationService for AttestationServiceInstance<T> {
); );
// valid attestation, propagate to the network // valid attestation, propagate to the network
let topic = Topic::new(BEACON_ATTESTATION_TOPIC.into()); let topic_string = format!(
"/{}/{}/{}",
TOPIC_PREFIX, BEACON_ATTESTATION_TOPIC, TOPIC_ENCODING_POSTFIX
);
let topic = Topic::new(topic_string);
let message = PubsubMessage::Attestation(attestation.as_ssz_bytes()); let message = PubsubMessage::Attestation(attestation.as_ssz_bytes());
self.network_chan self.network_chan

View File

@ -1,6 +1,6 @@
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::BEACON_BLOCK_TOPIC;
use eth2_libp2p::{PubsubMessage, Topic}; use eth2_libp2p::{PubsubMessage, Topic};
use eth2_libp2p::{BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX};
use futures::Future; use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use network::NetworkMessage; use network::NetworkMessage;
@ -105,8 +105,12 @@ impl<T: BeaconChainTypes> BeaconBlockService for BeaconBlockServiceInstance<T> {
"block_root" => format!("{}", block_root), "block_root" => format!("{}", block_root),
); );
// get the network topic to send on // create the network topic to send on
let topic = Topic::new(BEACON_BLOCK_TOPIC.into()); let topic_string = format!(
"/{}/{}/{}",
TOPIC_PREFIX, BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX
);
let topic = Topic::new(topic_string);
let message = PubsubMessage::Block(block.as_ssz_bytes()); let message = PubsubMessage::Block(block.as_ssz_bytes());
// Publish the block to the p2p network via gossipsub. // Publish the block to the p2p network via gossipsub.