Add gossip-test branch
This commit is contained in:
		
						commit
						d7289ab55e
					
				| @ -14,6 +14,7 @@ slog-term = "^2.4.0" | ||||
| slog-async = "^2.3.0" | ||||
| ctrlc = { version = "3.1.1", features = ["termination"] } | ||||
| tokio = "0.1.15" | ||||
| tokio-timer = "0.2.10" | ||||
| futures = "0.1.25" | ||||
| exit-future = "0.1.3" | ||||
| state_processing = { path = "../eth2/state_processing" } | ||||
|  | ||||
| @ -342,6 +342,10 @@ where | ||||
| 
 | ||||
|         // If required, transition the new state to the present slot.
 | ||||
|         for _ in state.slot.as_u64()..present_slot.as_u64() { | ||||
|             // Ensure the next epoch state caches are built in case of an epoch transition.
 | ||||
|             state.build_epoch_cache(RelativeEpoch::NextWithoutRegistryChange, &self.spec)?; | ||||
|             state.build_epoch_cache(RelativeEpoch::NextWithRegistryChange, &self.spec)?; | ||||
| 
 | ||||
|             per_slot_processing(&mut *state, &latest_block_header, &self.spec)?; | ||||
|         } | ||||
| 
 | ||||
| @ -405,6 +409,20 @@ where | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /// Reads the slot clock (see `self.read_slot_clock()` and returns the number of slots since
 | ||||
|     /// genesis.
 | ||||
|     pub fn slots_since_genesis(&self) -> Option<SlotHeight> { | ||||
|         let now = self.read_slot_clock()?; | ||||
| 
 | ||||
|         if now < self.spec.genesis_slot { | ||||
|             None | ||||
|         } else { | ||||
|             Some(SlotHeight::from( | ||||
|                 now.as_u64() - self.spec.genesis_slot.as_u64(), | ||||
|             )) | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /// Returns slot of the present state.
 | ||||
|     ///
 | ||||
|     /// This is distinct to `read_slot_clock`, which reads from the actual system clock. If
 | ||||
|  | ||||
| @ -28,15 +28,19 @@ pub fn initialise_beacon_chain( | ||||
|     let block_store = Arc::new(BeaconBlockStore::new(db.clone())); | ||||
|     let state_store = Arc::new(BeaconStateStore::new(db.clone())); | ||||
| 
 | ||||
|     let state_builder = TestingBeaconStateBuilder::from_deterministic_keypairs(8, &spec); | ||||
|     let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, &spec); | ||||
|     let (genesis_state, _keypairs) = state_builder.build(); | ||||
| 
 | ||||
|     let mut genesis_block = BeaconBlock::empty(&spec); | ||||
|     genesis_block.state_root = Hash256::from_slice(&genesis_state.hash_tree_root()); | ||||
| 
 | ||||
|     // Slot clock
 | ||||
|     let slot_clock = SystemTimeSlotClock::new(genesis_state.genesis_time, spec.seconds_per_slot) | ||||
|         .expect("Unable to load SystemTimeSlotClock"); | ||||
|     let slot_clock = SystemTimeSlotClock::new( | ||||
|         spec.genesis_slot, | ||||
|         genesis_state.genesis_time, | ||||
|         spec.seconds_per_slot, | ||||
|     ) | ||||
|     .expect("Unable to load SystemTimeSlotClock"); | ||||
|     // Choose the fork choice
 | ||||
|     let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); | ||||
| 
 | ||||
| @ -65,15 +69,19 @@ pub fn initialise_test_beacon_chain( | ||||
|     let block_store = Arc::new(BeaconBlockStore::new(db.clone())); | ||||
|     let state_store = Arc::new(BeaconStateStore::new(db.clone())); | ||||
| 
 | ||||
|     let state_builder = TestingBeaconStateBuilder::from_deterministic_keypairs(8, spec); | ||||
|     let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, spec); | ||||
|     let (genesis_state, _keypairs) = state_builder.build(); | ||||
| 
 | ||||
|     let mut genesis_block = BeaconBlock::empty(spec); | ||||
|     genesis_block.state_root = Hash256::from_slice(&genesis_state.hash_tree_root()); | ||||
| 
 | ||||
|     // Slot clock
 | ||||
|     let slot_clock = SystemTimeSlotClock::new(genesis_state.genesis_time, spec.seconds_per_slot) | ||||
|         .expect("Unable to load SystemTimeSlotClock"); | ||||
|     let slot_clock = SystemTimeSlotClock::new( | ||||
|         spec.genesis_slot, | ||||
|         genesis_state.genesis_time, | ||||
|         spec.seconds_per_slot, | ||||
|     ) | ||||
|     .expect("Unable to load SystemTimeSlotClock"); | ||||
|     // Choose the fork choice
 | ||||
|     let fork_choice = BitwiseLMDGhost::new(block_store.clone(), state_store.clone()); | ||||
| 
 | ||||
|  | ||||
| @ -14,6 +14,7 @@ types = { path = "../../eth2/types" } | ||||
| slot_clock = { path = "../../eth2/utils/slot_clock" } | ||||
| error-chain = "0.12.0" | ||||
| slog = "^2.2.3" | ||||
| ssz = { path = "../../eth2/utils/ssz" } | ||||
| tokio = "0.1.15" | ||||
| clap = "2.32.0" | ||||
| dirs = "1.0.3" | ||||
|  | ||||
| @ -8,12 +8,20 @@ pub mod notifier; | ||||
| use beacon_chain::BeaconChain; | ||||
| pub use client_config::ClientConfig; | ||||
| pub use client_types::ClientTypes; | ||||
| use db::ClientDB; | ||||
| use exit_future::Signal; | ||||
| use fork_choice::ForkChoice; | ||||
| use futures::{future::Future, Stream}; | ||||
| use network::Service as NetworkService; | ||||
| use slog::o; | ||||
| use slog::{error, info, o}; | ||||
| use slot_clock::SlotClock; | ||||
| use ssz::TreeHash; | ||||
| use std::marker::PhantomData; | ||||
| use std::sync::Arc; | ||||
| use std::time::{Duration, Instant}; | ||||
| use tokio::runtime::TaskExecutor; | ||||
| use tokio::timer::Interval; | ||||
| use types::Hash256; | ||||
| 
 | ||||
| /// Main beacon node client service. This provides the connection and initialisation of the clients
 | ||||
| /// sub-services in multiple threads.
 | ||||
| @ -26,6 +34,8 @@ pub struct Client<T: ClientTypes> { | ||||
|     pub network: Arc<NetworkService>, | ||||
|     /// Signal to terminate the RPC server.
 | ||||
|     pub rpc_exit_signal: Option<Signal>, | ||||
|     /// Signal to terminate the slot timer.
 | ||||
|     pub slot_timer_exit_signal: Option<Signal>, | ||||
|     /// The clients logger.
 | ||||
|     log: slog::Logger, | ||||
|     /// Marker to pin the beacon chain generics.
 | ||||
| @ -42,6 +52,35 @@ impl<TClientType: ClientTypes> Client<TClientType> { | ||||
|         // generate a beacon chain
 | ||||
|         let beacon_chain = TClientType::initialise_beacon_chain(&config); | ||||
| 
 | ||||
|         if beacon_chain.read_slot_clock().is_none() { | ||||
|             panic!("Cannot start client before genesis!") | ||||
|         } | ||||
| 
 | ||||
|         // Block starting the client until we have caught the state up to the current slot.
 | ||||
|         //
 | ||||
|         // If we don't block here we create an initial scenario where we're unable to process any
 | ||||
|         // blocks and we're basically useless.
 | ||||
|         { | ||||
|             let state_slot = beacon_chain.state.read().slot; | ||||
|             let wall_clock_slot = beacon_chain.read_slot_clock().unwrap(); | ||||
|             let slots_since_genesis = beacon_chain.slots_since_genesis().unwrap(); | ||||
|             info!( | ||||
|                 log, | ||||
|                 "Initializing state"; | ||||
|                 "state_slot" => state_slot, | ||||
|                 "wall_clock_slot" => wall_clock_slot, | ||||
|                 "slots_since_genesis" => slots_since_genesis, | ||||
|                 "catchup_distance" => wall_clock_slot - state_slot, | ||||
|             ); | ||||
|         } | ||||
|         do_state_catchup(&beacon_chain, &log); | ||||
|         info!( | ||||
|             log, | ||||
|             "State initialized"; | ||||
|             "state_slot" => beacon_chain.state.read().slot, | ||||
|             "wall_clock_slot" => beacon_chain.read_slot_clock().unwrap(), | ||||
|         ); | ||||
| 
 | ||||
|         // Start the network service, libp2p and syncing threads
 | ||||
|         // TODO: Add beacon_chain reference to network parameters
 | ||||
|         let network_config = &config.net_conf; | ||||
| @ -65,13 +104,73 @@ impl<TClientType: ClientTypes> Client<TClientType> { | ||||
|             )); | ||||
|         } | ||||
| 
 | ||||
|         let (slot_timer_exit_signal, exit) = exit_future::signal(); | ||||
|         if let Ok(Some(duration_to_next_slot)) = beacon_chain.slot_clock.duration_to_next_slot() { | ||||
|             // set up the validator work interval - start at next slot and proceed every slot
 | ||||
|             let interval = { | ||||
|                 // Set the interval to start at the next slot, and every slot after
 | ||||
|                 let slot_duration = Duration::from_secs(config.spec.seconds_per_slot); | ||||
|                 //TODO: Handle checked add correctly
 | ||||
|                 Interval::new(Instant::now() + duration_to_next_slot, slot_duration) | ||||
|             }; | ||||
| 
 | ||||
|             let chain = beacon_chain.clone(); | ||||
|             let log = log.new(o!("Service" => "SlotTimer")); | ||||
|             executor.spawn( | ||||
|                 exit.until( | ||||
|                     interval | ||||
|                         .for_each(move |_| { | ||||
|                             do_state_catchup(&chain, &log); | ||||
| 
 | ||||
|                             Ok(()) | ||||
|                         }) | ||||
|                         .map_err(|_| ()), | ||||
|                 ) | ||||
|                 .map(|_| ()), | ||||
|             ); | ||||
|         } | ||||
| 
 | ||||
|         Ok(Client { | ||||
|             config, | ||||
|             beacon_chain, | ||||
|             rpc_exit_signal, | ||||
|             slot_timer_exit_signal: Some(slot_timer_exit_signal), | ||||
|             log, | ||||
|             network, | ||||
|             phantom: PhantomData, | ||||
|         }) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| fn do_state_catchup<T, U, F>(chain: &Arc<BeaconChain<T, U, F>>, log: &slog::Logger) | ||||
| where | ||||
|     T: ClientDB, | ||||
|     U: SlotClock, | ||||
|     F: ForkChoice, | ||||
| { | ||||
|     if let Some(genesis_height) = chain.slots_since_genesis() { | ||||
|         let result = chain.catchup_state(); | ||||
| 
 | ||||
|         let common = o!( | ||||
|             "best_slot" => chain.head().beacon_block.slot, | ||||
|             "latest_block_root" => format!("{}", chain.head().beacon_block_root), | ||||
|             "wall_clock_slot" => chain.read_slot_clock().unwrap(), | ||||
|             "state_slot" => chain.state.read().slot, | ||||
|             "slots_since_genesis" => genesis_height, | ||||
|         ); | ||||
| 
 | ||||
|         match result { | ||||
|             Ok(_) => info!( | ||||
|                 log, | ||||
|                 "NewSlot"; | ||||
|                 common | ||||
|             ), | ||||
|             Err(e) => error!( | ||||
|                 log, | ||||
|                 "StateCatchupFailed"; | ||||
|                 "error" => format!("{:?}", e), | ||||
|                 common | ||||
|             ), | ||||
|         }; | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -2,7 +2,7 @@ use crate::Client; | ||||
| use crate::ClientTypes; | ||||
| use exit_future::Exit; | ||||
| use futures::{Future, Stream}; | ||||
| use slog::{debug, info, o}; | ||||
| use slog::{debug, o}; | ||||
| use std::sync::{Arc, Mutex}; | ||||
| use std::time::{Duration, Instant}; | ||||
| use tokio::runtime::TaskExecutor; | ||||
| @ -22,7 +22,7 @@ pub fn run<T: ClientTypes>(client: &Client<T>, executor: TaskExecutor, exit: Exi | ||||
| 
 | ||||
|     // build heartbeat logic here
 | ||||
|     let heartbeat = move |_| { | ||||
|         info!(log, "Temp heartbeat output"); | ||||
|         debug!(log, "Temp heartbeat output"); | ||||
|         //TODO: Remove this logic. Testing only
 | ||||
|         let mut count = counter.lock().unwrap(); | ||||
|         *count += 1; | ||||
|  | ||||
| @ -49,12 +49,15 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubE | ||||
|     fn inject_event(&mut self, event: GossipsubEvent) { | ||||
|         match event { | ||||
|             GossipsubEvent::Message(gs_msg) => { | ||||
|                 debug!(self.log, "Received GossipEvent"; "msg" => format!("{:?}", gs_msg)); | ||||
| 
 | ||||
|                 let pubsub_message = match PubsubMessage::ssz_decode(&gs_msg.data, 0) { | ||||
|                     //TODO: Punish peer on error
 | ||||
|                     Err(e) => { | ||||
|                         warn!( | ||||
|                             self.log, | ||||
|                             "Received undecodable message from Peer {:?}", gs_msg.source | ||||
|                             "Received undecodable message from Peer {:?} error", gs_msg.source; | ||||
|                             "error" => format!("{:?}", e) | ||||
|                         ); | ||||
|                         return; | ||||
|                     } | ||||
| @ -192,7 +195,7 @@ pub enum BehaviourEvent { | ||||
| } | ||||
| 
 | ||||
| /// Messages that are passed to and from the pubsub (Gossipsub) behaviour.
 | ||||
| #[derive(Debug, Clone)] | ||||
| #[derive(Debug, Clone, PartialEq)] | ||||
| pub enum PubsubMessage { | ||||
|     /// Gossipsub message providing notification of a new block.
 | ||||
|     Block(BlockRootSlot), | ||||
| @ -220,11 +223,11 @@ impl Decodable for PubsubMessage { | ||||
|     fn ssz_decode(bytes: &[u8], index: usize) -> Result<(Self, usize), DecodeError> { | ||||
|         let (id, index) = u32::ssz_decode(bytes, index)?; | ||||
|         match id { | ||||
|             1 => { | ||||
|             0 => { | ||||
|                 let (block, index) = BlockRootSlot::ssz_decode(bytes, index)?; | ||||
|                 Ok((PubsubMessage::Block(block), index)) | ||||
|             } | ||||
|             2 => { | ||||
|             1 => { | ||||
|                 let (attestation, index) = Attestation::ssz_decode(bytes, index)?; | ||||
|                 Ok((PubsubMessage::Attestation(attestation), index)) | ||||
|             } | ||||
| @ -232,3 +235,25 @@ impl Decodable for PubsubMessage { | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[cfg(test)] | ||||
| mod test { | ||||
|     use super::*; | ||||
|     use types::*; | ||||
| 
 | ||||
|     #[test] | ||||
|     fn ssz_encoding() { | ||||
|         let original = PubsubMessage::Block(BlockRootSlot { | ||||
|             block_root: Hash256::from_slice(&[42; 32]), | ||||
|             slot: Slot::new(4), | ||||
|         }); | ||||
| 
 | ||||
|         let encoded = ssz_encode(&original); | ||||
| 
 | ||||
|         println!("{:?}", encoded); | ||||
| 
 | ||||
|         let (decoded, _i) = PubsubMessage::ssz_decode(&encoded, 0).unwrap(); | ||||
| 
 | ||||
|         assert_eq!(original, decoded); | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -523,9 +523,9 @@ impl SimpleSync { | ||||
|         msg: BlockRootSlot, | ||||
|         network: &mut NetworkContext, | ||||
|     ) { | ||||
|         debug!( | ||||
|         info!( | ||||
|             self.log, | ||||
|             "BlockSlot"; | ||||
|             "NewGossipBlock"; | ||||
|             "peer" => format!("{:?}", peer_id), | ||||
|         ); | ||||
|         // TODO: filter out messages that a prior to the finalized slot.
 | ||||
| @ -557,9 +557,9 @@ impl SimpleSync { | ||||
|         msg: Attestation, | ||||
|         _network: &mut NetworkContext, | ||||
|     ) { | ||||
|         debug!( | ||||
|         info!( | ||||
|             self.log, | ||||
|             "Attestation"; | ||||
|             "NewAttestationGossip"; | ||||
|             "peer" => format!("{:?}", peer_id), | ||||
|         ); | ||||
| 
 | ||||
|  | ||||
| @ -1,3 +1,4 @@ | ||||
| use crate::beacon_chain::BeaconChain; | ||||
| use crossbeam_channel; | ||||
| use eth2_libp2p::rpc::methods::BlockRootSlot; | ||||
| use eth2_libp2p::PubsubMessage; | ||||
| @ -10,10 +11,14 @@ use protos::services::{ | ||||
| }; | ||||
| use protos::services_grpc::BeaconBlockService; | ||||
| use slog::Logger; | ||||
| use types::{Hash256, Slot}; | ||||
| use slog::{debug, error, info, warn}; | ||||
| use ssz::{Decodable, TreeHash}; | ||||
| use std::sync::Arc; | ||||
| use types::{BeaconBlock, Hash256, Slot}; | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct BeaconBlockServiceInstance { | ||||
|     pub chain: Arc<BeaconChain>, | ||||
|     pub network_chan: crossbeam_channel::Sender<NetworkMessage>, | ||||
|     pub log: Logger, | ||||
| } | ||||
| @ -30,8 +35,7 @@ impl BeaconBlockService for BeaconBlockServiceInstance { | ||||
| 
 | ||||
|         // TODO: build a legit block.
 | ||||
|         let mut block = BeaconBlockProto::new(); | ||||
|         block.set_slot(req.get_slot()); | ||||
|         block.set_block_root(b"cats".to_vec()); | ||||
|         block.set_ssz(b"cats".to_vec()); | ||||
| 
 | ||||
|         let mut resp = ProduceBeaconBlockResponse::new(); | ||||
|         resp.set_block(block); | ||||
| @ -49,26 +53,88 @@ impl BeaconBlockService for BeaconBlockServiceInstance { | ||||
|         req: PublishBeaconBlockRequest, | ||||
|         sink: UnarySink<PublishBeaconBlockResponse>, | ||||
|     ) { | ||||
|         let block = req.get_block(); | ||||
|         let block_root = Hash256::from_slice(block.get_block_root()); | ||||
|         let block_slot = BlockRootSlot { | ||||
|             block_root, | ||||
|             slot: Slot::from(block.get_slot()), | ||||
|         }; | ||||
|         println!("publishing block with root {:?}", block_root); | ||||
| 
 | ||||
|         // TODO: Obtain topics from the network service properly.
 | ||||
|         let topic = types::TopicBuilder::new("beacon_chain".to_string()).build(); | ||||
|         let message = PubsubMessage::Block(block_slot); | ||||
|         println!("Sending beacon block to gossipsub"); | ||||
|         self.network_chan.send(NetworkMessage::Publish { | ||||
|             topics: vec![topic], | ||||
|             message, | ||||
|         }); | ||||
| 
 | ||||
|         // TODO: actually process the block.
 | ||||
|         let mut resp = PublishBeaconBlockResponse::new(); | ||||
|         resp.set_success(true); | ||||
| 
 | ||||
|         let ssz_serialized_block = req.get_block().get_ssz(); | ||||
| 
 | ||||
|         match BeaconBlock::ssz_decode(ssz_serialized_block, 0) { | ||||
|             Ok((block, _i)) => { | ||||
|                 let block_root = Hash256::from_slice(&block.hash_tree_root()[..]); | ||||
| 
 | ||||
|                 match self.chain.process_block(block.clone()) { | ||||
|                     Ok(outcome) => { | ||||
|                         if outcome.sucessfully_processed() { | ||||
|                             // Block was successfully processed.
 | ||||
|                             info!( | ||||
|                                 self.log, | ||||
|                                 "PublishBeaconBlock"; | ||||
|                                 "type" => "valid_block", | ||||
|                                 "block_slot" => block.slot, | ||||
|                                 "outcome" => format!("{:?}", outcome) | ||||
|                             ); | ||||
| 
 | ||||
|                             // TODO: Obtain topics from the network service properly.
 | ||||
|                             let topic = | ||||
|                                 types::TopicBuilder::new("beacon_chain".to_string()).build(); | ||||
|                             let message = PubsubMessage::Block(BlockRootSlot { | ||||
|                                 block_root, | ||||
|                                 slot: block.slot, | ||||
|                             }); | ||||
| 
 | ||||
|                             println!("Sending beacon block to gossipsub"); | ||||
|                             self.network_chan.send(NetworkMessage::Publish { | ||||
|                                 topics: vec![topic], | ||||
|                                 message, | ||||
|                             }); | ||||
| 
 | ||||
|                             resp.set_success(true); | ||||
|                         } else if outcome.is_invalid() { | ||||
|                             // Block was invalid.
 | ||||
|                             warn!( | ||||
|                                 self.log, | ||||
|                                 "PublishBeaconBlock"; | ||||
|                                 "type" => "invalid_block", | ||||
|                                 "outcome" => format!("{:?}", outcome) | ||||
|                             ); | ||||
| 
 | ||||
|                             resp.set_success(false); | ||||
|                             resp.set_msg( | ||||
|                                 format!("InvalidBlock: {:?}", outcome).as_bytes().to_vec(), | ||||
|                             ); | ||||
|                         } else { | ||||
|                             // Some failure during processing.
 | ||||
|                             warn!( | ||||
|                                 self.log, | ||||
|                                 "PublishBeaconBlock"; | ||||
|                                 "type" => "unable_to_import", | ||||
|                                 "outcome" => format!("{:?}", outcome) | ||||
|                             ); | ||||
| 
 | ||||
|                             resp.set_success(false); | ||||
|                             resp.set_msg(format!("other: {:?}", outcome).as_bytes().to_vec()); | ||||
|                         } | ||||
|                     } | ||||
|                     Err(e) => { | ||||
|                         // Some failure during processing.
 | ||||
|                         error!( | ||||
|                             self.log, | ||||
|                             "PublishBeaconBlock"; | ||||
|                             "type" => "failed_to_process", | ||||
|                             "error" => format!("{:?}", e) | ||||
|                         ); | ||||
| 
 | ||||
|                         resp.set_success(false); | ||||
|                         resp.set_msg(format!("failed_to_process: {:?}", e).as_bytes().to_vec()); | ||||
|                     } | ||||
|                 } | ||||
| 
 | ||||
|                 resp.set_success(true); | ||||
|             } | ||||
|             Err(_) => { | ||||
|                 resp.set_success(false); | ||||
|                 resp.set_msg(b"Invalid SSZ".to_vec()); | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         let f = sink | ||||
|             .success(resp) | ||||
|  | ||||
| @ -5,14 +5,18 @@ use beacon_chain::{ | ||||
|     parking_lot::RwLockReadGuard, | ||||
|     slot_clock::SlotClock, | ||||
|     types::{BeaconState, ChainSpec}, | ||||
|     CheckPoint, | ||||
| }; | ||||
| pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome}; | ||||
| use types::BeaconBlock; | ||||
| 
 | ||||
| /// The RPC's API to the beacon chain.
 | ||||
| pub trait BeaconChain: Send + Sync { | ||||
|     fn get_spec(&self) -> &ChainSpec; | ||||
| 
 | ||||
|     fn get_state(&self) -> RwLockReadGuard<BeaconState>; | ||||
| 
 | ||||
|     fn process_block(&self, block: BeaconBlock) | ||||
|         -> Result<BlockProcessingOutcome, BeaconChainError>; | ||||
| } | ||||
| 
 | ||||
| impl<T, U, F> BeaconChain for RawBeaconChain<T, U, F> | ||||
| @ -28,4 +32,11 @@ where | ||||
|     fn get_state(&self) -> RwLockReadGuard<BeaconState> { | ||||
|         self.state.read() | ||||
|     } | ||||
| 
 | ||||
|     fn process_block( | ||||
|         &self, | ||||
|         block: BeaconBlock, | ||||
|     ) -> Result<BlockProcessingOutcome, BeaconChainError> { | ||||
|         self.process_block(block) | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -1,7 +1,7 @@ | ||||
| use crate::beacon_chain::BeaconChain; | ||||
| use futures::Future; | ||||
| use grpcio::{RpcContext, UnarySink}; | ||||
| use protos::services::{Empty, Fork, NodeInfo}; | ||||
| use protos::services::{Empty, Fork, NodeInfoResponse}; | ||||
| use protos::services_grpc::BeaconNodeService; | ||||
| use slog::{trace, warn}; | ||||
| use std::sync::Arc; | ||||
| @ -14,11 +14,11 @@ pub struct BeaconNodeServiceInstance { | ||||
| 
 | ||||
| impl BeaconNodeService for BeaconNodeServiceInstance { | ||||
|     /// Provides basic node information.
 | ||||
|     fn info(&mut self, ctx: RpcContext, _req: Empty, sink: UnarySink<NodeInfo>) { | ||||
|     fn info(&mut self, ctx: RpcContext, _req: Empty, sink: UnarySink<NodeInfoResponse>) { | ||||
|         trace!(self.log, "Node info requested via RPC"); | ||||
| 
 | ||||
|         // build the response
 | ||||
|         let mut node_info = NodeInfo::new(); | ||||
|         let mut node_info = NodeInfoResponse::new(); | ||||
|         node_info.set_version(version::version()); | ||||
| 
 | ||||
|         // get the chain state
 | ||||
| @ -34,6 +34,7 @@ impl BeaconNodeService for BeaconNodeServiceInstance { | ||||
| 
 | ||||
|         node_info.set_fork(fork); | ||||
|         node_info.set_genesis_time(genesis_time); | ||||
|         node_info.set_genesis_slot(self.chain.get_spec().genesis_slot.as_u64()); | ||||
|         node_info.set_chain_id(self.chain.get_spec().chain_id as u32); | ||||
| 
 | ||||
|         // send the node_info the requester
 | ||||
|  | ||||
| @ -43,6 +43,7 @@ pub fn start_server( | ||||
| 
 | ||||
|     let beacon_block_service = { | ||||
|         let instance = BeaconBlockServiceInstance { | ||||
|             chain: beacon_chain.clone(), | ||||
|             network_chan, | ||||
|             log: log.clone(), | ||||
|         }; | ||||
|  | ||||
| @ -6,10 +6,12 @@ use futures::Future; | ||||
| use slog::info; | ||||
| use std::cell::RefCell; | ||||
| use tokio::runtime::Builder; | ||||
| use tokio_timer::clock::Clock; | ||||
| 
 | ||||
| pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Result<()> { | ||||
|     let mut runtime = Builder::new() | ||||
|         .name_prefix("main-") | ||||
|         .clock(Clock::system()) | ||||
|         .build() | ||||
|         .map_err(|e| format!("{:?}", e))?; | ||||
| 
 | ||||
|  | ||||
| @ -20,7 +20,6 @@ pub fn per_slot_processing( | ||||
| 
 | ||||
|     if (state.slot + 1) % spec.slots_per_epoch == 0 { | ||||
|         per_epoch_processing(state, spec)?; | ||||
|         state.advance_caches(); | ||||
|     } | ||||
| 
 | ||||
|     state.slot += 1; | ||||
|  | ||||
| @ -120,8 +120,10 @@ impl TestingBeaconStateBuilder { | ||||
|             }) | ||||
|             .collect(); | ||||
| 
 | ||||
|         let genesis_time = 1553647464; // arbitrary
 | ||||
| 
 | ||||
|         let mut state = BeaconState::genesis( | ||||
|             0, | ||||
|             genesis_time, | ||||
|             Eth1Data { | ||||
|                 deposit_root: Hash256::zero(), | ||||
|                 block_hash: Hash256::zero(), | ||||
|  | ||||
| @ -3,10 +3,13 @@ mod testing_slot_clock; | ||||
| 
 | ||||
| pub use crate::system_time_slot_clock::{Error as SystemTimeSlotClockError, SystemTimeSlotClock}; | ||||
| pub use crate::testing_slot_clock::{Error as TestingSlotClockError, TestingSlotClock}; | ||||
| use std::time::Duration; | ||||
| pub use types::Slot; | ||||
| 
 | ||||
| pub trait SlotClock: Send + Sync { | ||||
|     type Error; | ||||
| 
 | ||||
|     fn present_slot(&self) -> Result<Option<Slot>, Self::Error>; | ||||
| 
 | ||||
|     fn duration_to_next_slot(&self) -> Result<Option<Duration>, Self::Error>; | ||||
| } | ||||
|  | ||||
| @ -13,6 +13,7 @@ pub enum Error { | ||||
| /// Determines the present slot based upon the present system time.
 | ||||
| #[derive(Clone)] | ||||
| pub struct SystemTimeSlotClock { | ||||
|     genesis_slot: Slot, | ||||
|     genesis_seconds: u64, | ||||
|     slot_duration_seconds: u64, | ||||
| } | ||||
| @ -22,6 +23,7 @@ impl SystemTimeSlotClock { | ||||
|     ///
 | ||||
|     /// Returns an Error if `slot_duration_seconds == 0`.
 | ||||
|     pub fn new( | ||||
|         genesis_slot: Slot, | ||||
|         genesis_seconds: u64, | ||||
|         slot_duration_seconds: u64, | ||||
|     ) -> Result<SystemTimeSlotClock, Error> { | ||||
| @ -29,6 +31,7 @@ impl SystemTimeSlotClock { | ||||
|             Err(Error::SlotDurationIsZero) | ||||
|         } else { | ||||
|             Ok(Self { | ||||
|                 genesis_slot, | ||||
|                 genesis_seconds, | ||||
|                 slot_duration_seconds, | ||||
|             }) | ||||
| @ -44,11 +47,17 @@ impl SlotClock for SystemTimeSlotClock { | ||||
|         let duration_since_epoch = syslot_time.duration_since(SystemTime::UNIX_EPOCH)?; | ||||
|         let duration_since_genesis = | ||||
|             duration_since_epoch.checked_sub(Duration::from_secs(self.genesis_seconds)); | ||||
| 
 | ||||
|         match duration_since_genesis { | ||||
|             None => Ok(None), | ||||
|             Some(d) => Ok(slot_from_duration(self.slot_duration_seconds, d)), | ||||
|             Some(d) => Ok(slot_from_duration(self.slot_duration_seconds, d) | ||||
|                 .and_then(|s| Some(s + self.genesis_slot))), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn duration_to_next_slot(&self) -> Result<Option<Duration>, Error> { | ||||
|         duration_to_next_slot(self.genesis_seconds, self.slot_duration_seconds) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<SystemTimeError> for Error { | ||||
| @ -62,6 +71,30 @@ fn slot_from_duration(slot_duration_seconds: u64, duration: Duration) -> Option< | ||||
|         duration.as_secs().checked_div(slot_duration_seconds)?, | ||||
|     )) | ||||
| } | ||||
| // calculate the duration to the next slot
 | ||||
| fn duration_to_next_slot( | ||||
|     genesis_time: u64, | ||||
|     seconds_per_slot: u64, | ||||
| ) -> Result<Option<Duration>, Error> { | ||||
|     let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?; | ||||
|     let genesis_time = Duration::from_secs(genesis_time); | ||||
| 
 | ||||
|     if now < genesis_time { | ||||
|         return Ok(None); | ||||
|     } | ||||
| 
 | ||||
|     let since_genesis = now - genesis_time; | ||||
| 
 | ||||
|     let elapsed_slots = since_genesis.as_secs() / seconds_per_slot; | ||||
| 
 | ||||
|     let next_slot_start_seconds = (elapsed_slots + 1) | ||||
|         .checked_mul(seconds_per_slot) | ||||
|         .expect("Next slot time should not overflow u64"); | ||||
| 
 | ||||
|     let time_to_next_slot = Duration::from_secs(next_slot_start_seconds) - since_genesis; | ||||
| 
 | ||||
|     Ok(Some(time_to_next_slot)) | ||||
| } | ||||
| 
 | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
| @ -74,6 +107,7 @@ mod tests { | ||||
|     #[test] | ||||
|     fn test_slot_now() { | ||||
|         let slot_time = 100; | ||||
|         let genesis_slot = Slot::new(0); | ||||
| 
 | ||||
|         let now = SystemTime::now(); | ||||
|         let since_epoch = now.duration_since(SystemTime::UNIX_EPOCH).unwrap(); | ||||
| @ -81,18 +115,21 @@ mod tests { | ||||
|         let genesis = since_epoch.as_secs() - slot_time * 89; | ||||
| 
 | ||||
|         let clock = SystemTimeSlotClock { | ||||
|             genesis_slot, | ||||
|             genesis_seconds: genesis, | ||||
|             slot_duration_seconds: slot_time, | ||||
|         }; | ||||
|         assert_eq!(clock.present_slot().unwrap(), Some(Slot::new(89))); | ||||
| 
 | ||||
|         let clock = SystemTimeSlotClock { | ||||
|             genesis_slot, | ||||
|             genesis_seconds: since_epoch.as_secs(), | ||||
|             slot_duration_seconds: slot_time, | ||||
|         }; | ||||
|         assert_eq!(clock.present_slot().unwrap(), Some(Slot::new(0))); | ||||
| 
 | ||||
|         let clock = SystemTimeSlotClock { | ||||
|             genesis_slot, | ||||
|             genesis_seconds: since_epoch.as_secs() - slot_time * 42 - 5, | ||||
|             slot_duration_seconds: slot_time, | ||||
|         }; | ||||
|  | ||||
| @ -1,5 +1,6 @@ | ||||
| use super::SlotClock; | ||||
| use std::sync::RwLock; | ||||
| use std::time::Duration; | ||||
| use types::Slot; | ||||
| 
 | ||||
| #[derive(Debug, PartialEq)] | ||||
| @ -32,6 +33,11 @@ impl SlotClock for TestingSlotClock { | ||||
|         let slot = *self.slot.read().expect("TestingSlotClock poisoned."); | ||||
|         Ok(Some(Slot::new(slot))) | ||||
|     } | ||||
| 
 | ||||
|     /// Always returns a duration of 1 second.
 | ||||
|     fn duration_to_next_slot(&self) -> Result<Option<Duration>, Error> { | ||||
|         Ok(Some(Duration::from_secs(1))) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[cfg(test)] | ||||
|  | ||||
| @ -14,7 +14,7 @@ package ethereum.beacon.rpc.v1; | ||||
| 
 | ||||
| // Service that currently identifies a beacon node | ||||
| service BeaconNodeService { | ||||
|     rpc Info(Empty) returns (NodeInfo); | ||||
|     rpc Info(Empty) returns (NodeInfoResponse); | ||||
| } | ||||
| 
 | ||||
| /// Service that handles block production | ||||
| @ -40,11 +40,12 @@ service AttestationService { | ||||
| /* | ||||
|  * Beacon Node Service Message | ||||
|  */ | ||||
| message NodeInfo { | ||||
| message NodeInfoResponse { | ||||
|     string version = 1; | ||||
|     Fork fork = 2; | ||||
|     uint32 chain_id = 3; | ||||
|     uint64 genesis_time = 4; | ||||
|     uint64 genesis_slot = 5; | ||||
| } | ||||
| 
 | ||||
| message Fork { | ||||
| @ -53,8 +54,7 @@ message Fork { | ||||
|     uint64 epoch = 3; | ||||
| } | ||||
| 
 | ||||
| message Empty { | ||||
| } | ||||
| message Empty {} | ||||
| 
 | ||||
| 
 | ||||
| /* | ||||
| @ -83,10 +83,7 @@ message PublishBeaconBlockResponse { | ||||
| } | ||||
| 
 | ||||
| message BeaconBlock { | ||||
| 	uint64 slot = 1; | ||||
| 	bytes block_root = 2; | ||||
| 	bytes randao_reveal = 3; | ||||
| 	bytes signature = 4; | ||||
| 	bytes ssz = 1; | ||||
| } | ||||
| 
 | ||||
| /* | ||||
|  | ||||
| @ -5,7 +5,7 @@ use protos::services::{ | ||||
| use protos::services_grpc::BeaconBlockServiceClient; | ||||
| use ssz::{ssz_encode, Decodable}; | ||||
| use std::sync::Arc; | ||||
| use types::{BeaconBlock, BeaconBlockBody, Eth1Data, Hash256, Signature, Slot}; | ||||
| use types::{BeaconBlock, Signature, Slot}; | ||||
| 
 | ||||
| /// A newtype designed to wrap the gRPC-generated service so the `BeaconNode` trait may be
 | ||||
| /// implemented upon it.
 | ||||
| @ -40,33 +40,12 @@ impl BeaconNode for BeaconBlockGrpcClient { | ||||
| 
 | ||||
|         if reply.has_block() { | ||||
|             let block = reply.get_block(); | ||||
|             let ssz = block.get_ssz(); | ||||
| 
 | ||||
|             let (signature, _) = Signature::ssz_decode(block.get_signature(), 0) | ||||
|                 .map_err(|_| BeaconNodeError::DecodeFailure)?; | ||||
|             let (block, _i) = | ||||
|                 BeaconBlock::ssz_decode(&ssz, 0).map_err(|_| BeaconNodeError::DecodeFailure)?; | ||||
| 
 | ||||
|             let (randao_reveal, _) = Signature::ssz_decode(block.get_randao_reveal(), 0) | ||||
|                 .map_err(|_| BeaconNodeError::DecodeFailure)?; | ||||
| 
 | ||||
|             // TODO: this conversion is incomplete; fix it.
 | ||||
|             Ok(Some(BeaconBlock { | ||||
|                 slot: Slot::new(block.get_slot()), | ||||
|                 previous_block_root: Hash256::zero(), | ||||
|                 state_root: Hash256::zero(), | ||||
|                 signature, | ||||
|                 body: BeaconBlockBody { | ||||
|                     randao_reveal, | ||||
|                     eth1_data: Eth1Data { | ||||
|                         deposit_root: Hash256::zero(), | ||||
|                         block_hash: Hash256::zero(), | ||||
|                     }, | ||||
|                     proposer_slashings: vec![], | ||||
|                     attester_slashings: vec![], | ||||
|                     attestations: vec![], | ||||
|                     deposits: vec![], | ||||
|                     voluntary_exits: vec![], | ||||
|                     transfers: vec![], | ||||
|                 }, | ||||
|             })) | ||||
|             Ok(Some(block)) | ||||
|         } else { | ||||
|             Ok(None) | ||||
|         } | ||||
| @ -79,12 +58,11 @@ impl BeaconNode for BeaconBlockGrpcClient { | ||||
|     fn publish_beacon_block(&self, block: BeaconBlock) -> Result<PublishOutcome, BeaconNodeError> { | ||||
|         let mut req = PublishBeaconBlockRequest::new(); | ||||
| 
 | ||||
|         let ssz = ssz_encode(&block); | ||||
| 
 | ||||
|         // TODO: this conversion is incomplete; fix it.
 | ||||
|         let mut grpc_block = GrpcBeaconBlock::new(); | ||||
|         grpc_block.set_slot(block.slot.as_u64()); | ||||
|         grpc_block.set_block_root(vec![0]); | ||||
|         grpc_block.set_randao_reveal(ssz_encode(&block.body.randao_reveal)); | ||||
|         grpc_block.set_signature(ssz_encode(&block.signature)); | ||||
|         grpc_block.set_ssz(ssz); | ||||
| 
 | ||||
|         req.set_block(grpc_block); | ||||
| 
 | ||||
|  | ||||
| @ -42,8 +42,6 @@ pub struct Service { | ||||
|     slot_clock: SystemTimeSlotClock, | ||||
|     /// The current slot we are processing.
 | ||||
|     current_slot: Slot, | ||||
|     /// Duration until the next slot. This is used for initializing the tokio timer interval.
 | ||||
|     duration_to_next_slot: Duration, | ||||
|     /// The number of slots per epoch to allow for converting slots to epochs.
 | ||||
|     slots_per_epoch: u64, | ||||
|     // GRPC Clients
 | ||||
| @ -104,6 +102,7 @@ impl Service { | ||||
| 
 | ||||
|         // build requisite objects to form Self
 | ||||
|         let genesis_time = node_info.get_genesis_time(); | ||||
|         let genesis_slot = Slot::from(node_info.get_genesis_slot()); | ||||
| 
 | ||||
|         info!(log,"Beacon node connected"; "Node Version" => node_info.version.clone(), "Chain ID" => node_info.chain_id, "Genesis time" => genesis_time); | ||||
| 
 | ||||
| @ -139,46 +138,21 @@ impl Service { | ||||
|         }; | ||||
| 
 | ||||
|         // build the validator slot clock
 | ||||
|         let slot_clock = SystemTimeSlotClock::new(genesis_time, config.spec.seconds_per_slot) | ||||
|             .expect("Unable to instantiate SystemTimeSlotClock."); | ||||
|         let slot_clock = | ||||
|             SystemTimeSlotClock::new(genesis_slot, genesis_time, config.spec.seconds_per_slot) | ||||
|                 .expect("Unable to instantiate SystemTimeSlotClock."); | ||||
| 
 | ||||
|         let current_slot = slot_clock | ||||
|             .present_slot() | ||||
|             .map_err(|e| ErrorKind::SlotClockError(e))? | ||||
|             .expect("Genesis must be in the future"); | ||||
| 
 | ||||
|         // calculate the duration to the next slot
 | ||||
|         let duration_to_next_slot = { | ||||
|             let seconds_per_slot = config.spec.seconds_per_slot; | ||||
|             let syslot_time = SystemTime::now(); | ||||
|             let duration_since_epoch = syslot_time | ||||
|                 .duration_since(SystemTime::UNIX_EPOCH) | ||||
|                 .map_err(|e| ErrorKind::SystemTimeError(e.to_string()))?; | ||||
|             let duration_since_genesis = duration_since_epoch | ||||
|                 .checked_sub(Duration::from_secs(genesis_time)) | ||||
|                 .expect("Genesis must be in the future. Checked on connection"); | ||||
|             let elapsed_slots = duration_since_epoch | ||||
|                 .as_secs() | ||||
|                 .checked_div(seconds_per_slot as u64) | ||||
|                 .expect("Seconds per slot should not be 0"); | ||||
| 
 | ||||
|             // the duration to the next slot
 | ||||
|             Duration::from_secs( | ||||
|                 (elapsed_slots + 1) | ||||
|                     .checked_mul(seconds_per_slot) | ||||
|                     .expect("Next slot time should not overflow u64"), | ||||
|             ) | ||||
|             .checked_sub(duration_since_genesis) | ||||
|             .expect("This should never saturate") | ||||
|         }; | ||||
| 
 | ||||
|         Ok(Self { | ||||
|             connected_node_version: node_info.version, | ||||
|             chain_id: node_info.chain_id as u16, | ||||
|             fork, | ||||
|             slot_clock, | ||||
|             current_slot, | ||||
|             duration_to_next_slot, | ||||
|             slots_per_epoch: config.spec.slots_per_epoch, | ||||
|             beacon_block_client, | ||||
|             validator_client, | ||||
| @ -201,15 +175,18 @@ impl Service { | ||||
|             .build() | ||||
|             .map_err(|e| format!("Tokio runtime failed: {}", e))?; | ||||
| 
 | ||||
|         let duration_to_next_slot = service | ||||
|             .slot_clock | ||||
|             .duration_to_next_slot() | ||||
|             .map_err(|e| format!("System clock error: {:?}", e))? | ||||
|             .expect("Cannot start before genesis"); | ||||
| 
 | ||||
|         // set up the validator work interval - start at next slot and proceed every slot
 | ||||
|         let interval = { | ||||
|             // Set the interval to start at the next slot, and every slot after
 | ||||
|             let slot_duration = Duration::from_secs(config.spec.seconds_per_slot); | ||||
|             //TODO: Handle checked add correctly
 | ||||
|             Interval::new( | ||||
|                 Instant::now() + service.duration_to_next_slot, | ||||
|                 slot_duration, | ||||
|             ) | ||||
|             Interval::new(Instant::now() + duration_to_next_slot, slot_duration) | ||||
|         }; | ||||
| 
 | ||||
|         /* kick off core service */ | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user