Adds error handling to validator client service
This commit is contained in:
		
							parent
							
								
									4fdb01e5f0
								
							
						
					
					
						commit
						ca9af49d4e
					
				| @ -64,8 +64,6 @@ impl<TClientType: ClientTypes> Client<TClientType> { | ||||
|             )); | ||||
|         } | ||||
| 
 | ||||
|         println!("Here"); | ||||
| 
 | ||||
|         Ok(Client { | ||||
|             config, | ||||
|             beacon_chain, | ||||
|  | ||||
| @ -21,3 +21,4 @@ slog-term = "^2.4.0" | ||||
| slog-async = "^2.3.0" | ||||
| tokio = "0.1.18" | ||||
| tokio-timer = "0.2.10" | ||||
| error-chain = "0.12.0" | ||||
|  | ||||
							
								
								
									
										22
									
								
								validator_client/src/error.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										22
									
								
								validator_client/src/error.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,22 @@ | ||||
| use slot_clock; | ||||
| 
 | ||||
| use error_chain::{ | ||||
|     error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed, | ||||
|     impl_extract_backtrace, | ||||
| }; | ||||
| 
 | ||||
| error_chain! { | ||||
|    links  { } | ||||
| 
 | ||||
|    errors { | ||||
|     SlotClockError(e: slot_clock::SystemTimeSlotClockError) { | ||||
|         description("Error reading system time"), | ||||
|         display("SlotClockError: '{:?}'", e) | ||||
|     } | ||||
| 
 | ||||
|     SystemTimeError(t: String ) { | ||||
|         description("Error reading system time"), | ||||
|         display("SystemTimeError: '{}'", t) | ||||
|     } | ||||
|    } | ||||
| } | ||||
| @ -2,12 +2,13 @@ mod attester_service; | ||||
| mod block_producer_service; | ||||
| mod config; | ||||
| mod duties; | ||||
| pub mod error; | ||||
| mod service; | ||||
| 
 | ||||
| use crate::config::Config as ValidatorConfig; | ||||
| use clap::{App, Arg}; | ||||
| use service::Service as ValidatorService; | ||||
| use slog::{o, Drain}; | ||||
| use slog::{error, info, o, Drain}; | ||||
| 
 | ||||
| fn main() { | ||||
|     // Logging
 | ||||
| @ -50,5 +51,8 @@ fn main() { | ||||
|     let config = ValidatorConfig::parse_args(matches, &log).unwrap(); | ||||
| 
 | ||||
|     // start the validator service.
 | ||||
|     ValidatorService::start(config, log); | ||||
|     match ValidatorService::start(config, log.clone()) { | ||||
|         Ok(_) => info!(log, "Validator client shutdown successfully."), | ||||
|         Err(e) => error!(log, "Validator exited due to {:?}", e), | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -3,6 +3,8 @@ use crate::attester_service::{AttestationGrpcClient, AttesterService}; | ||||
| use crate::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService}; | ||||
| use crate::config::Config as ValidatorConfig; | ||||
| use crate::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap}; | ||||
| use crate::error as error_chain; | ||||
| use crate::error::ErrorKind; | ||||
| use attester::test_utils::EpochMap; | ||||
| use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester}; | ||||
| use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer}; | ||||
| @ -13,9 +15,8 @@ use protos::services_grpc::{ | ||||
|     AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient, | ||||
|     ValidatorServiceClient, | ||||
| }; | ||||
| use slog::{debug, info, warn}; | ||||
| use slog::{debug, error, info, warn}; | ||||
| use slot_clock::{SlotClock, SystemTimeSlotClock}; | ||||
| use std::ops::Sub; | ||||
| use std::sync::Arc; | ||||
| use std::time::{Duration, Instant, SystemTime}; | ||||
| use tokio::prelude::*; | ||||
| @ -57,7 +58,10 @@ impl Service { | ||||
|     ///
 | ||||
|     ///  This tries to connect to a beacon node. Once connected, it initialised the gRPC clients
 | ||||
|     ///  and returns an instance of the service.
 | ||||
|     fn initialize_service(config: &ValidatorConfig, log: slog::Logger) -> Self { | ||||
|     fn initialize_service( | ||||
|         config: &ValidatorConfig, | ||||
|         log: slog::Logger, | ||||
|     ) -> error_chain::Result<Self> { | ||||
|         // initialise the beacon node client to check for a connection
 | ||||
| 
 | ||||
|         let env = Arc::new(EnvBuilder::new().build()); | ||||
| @ -86,7 +90,7 @@ impl Service { | ||||
|                             log, | ||||
|                             "Beacon Node's genesis time is in the future. No work to do.\n Exiting" | ||||
|                         ); | ||||
|                         //                        return Err("Genesis Time in the future");
 | ||||
|                         return Err("Genesis time in the future".into()); | ||||
|                     } | ||||
|                     break info; | ||||
|                 } | ||||
| @ -136,35 +140,37 @@ impl Service { | ||||
|             Arc::new(AttestationServiceClient::new(ch)) | ||||
|         }; | ||||
| 
 | ||||
|         //TODO: Add error chain. Handle errors
 | ||||
|         let current_slot = slot_clock.present_slot().unwrap().unwrap().sub(1); | ||||
|         let current_slot = slot_clock | ||||
|             .present_slot() | ||||
|             .map_err(|e| ErrorKind::SlotClockError(e))? | ||||
|             .expect("Genesis must be in the future"); | ||||
| 
 | ||||
|         // calculate the duration to the next slot
 | ||||
|         let duration_to_next_slot = { | ||||
|             let seconds_per_slot = config.spec.seconds_per_slot; | ||||
|             let syslot_time = SystemTime::now(); | ||||
|             let duration_since_epoch = syslot_time.duration_since(SystemTime::UNIX_EPOCH).unwrap(); | ||||
|             let mut duration_to_next_slot = None; | ||||
|             if let Some(duration_since_genesis) = | ||||
|                 duration_since_epoch.checked_sub(Duration::from_secs(genesis_time)) | ||||
|             { | ||||
|                 let elapsed_slots = duration_since_epoch | ||||
|                     .as_secs() | ||||
|                     .checked_div(config.spec.seconds_per_slot as u64) | ||||
|                     .unwrap(); | ||||
|                 duration_to_next_slot = Some( | ||||
|                     Duration::from_secs( | ||||
|                         (elapsed_slots + 1) | ||||
|                             .checked_mul(config.spec.seconds_per_slot) | ||||
|                             .unwrap(), | ||||
|                     ) | ||||
|                     .checked_sub(duration_since_genesis) | ||||
|                     .expect("This should never saturate"), | ||||
|                 ); | ||||
|             } | ||||
|             duration_to_next_slot.unwrap_or_else(|| Duration::from_secs(0)) | ||||
|             let duration_since_epoch = syslot_time | ||||
|                 .duration_since(SystemTime::UNIX_EPOCH) | ||||
|                 .map_err(|e| ErrorKind::SystemTimeError(e.to_string()))?; | ||||
|             let duration_since_genesis = duration_since_epoch | ||||
|                 .checked_sub(Duration::from_secs(genesis_time)) | ||||
|                 .expect("Genesis must be in the future. Checked on connection"); | ||||
|             let elapsed_slots = duration_since_epoch | ||||
|                 .as_secs() | ||||
|                 .checked_div(seconds_per_slot as u64) | ||||
|                 .expect("Seconds per slot should not be 0"); | ||||
| 
 | ||||
|             // the duration to the next slot
 | ||||
|             Duration::from_secs( | ||||
|                 (elapsed_slots + 1) | ||||
|                     .checked_mul(seconds_per_slot) | ||||
|                     .expect("Next slot time should not overflow u64"), | ||||
|             ) | ||||
|             .checked_sub(duration_since_genesis) | ||||
|             .expect("This should never saturate") | ||||
|         }; | ||||
| 
 | ||||
|         Self { | ||||
|         Ok(Self { | ||||
|             connected_node_version: node_info.version, | ||||
|             chain_id: node_info.chain_id as u16, | ||||
|             fork, | ||||
| @ -175,13 +181,13 @@ impl Service { | ||||
|             validator_client, | ||||
|             attester_client, | ||||
|             log, | ||||
|         } | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     /// Initialise the service then run the core thread.
 | ||||
|     pub fn start(config: ValidatorConfig, log: slog::Logger) { | ||||
|     pub fn start(config: ValidatorConfig, log: slog::Logger) -> error_chain::Result<()> { | ||||
|         // connect to the node and retrieve its properties and initialize the gRPC clients
 | ||||
|         let service = Service::initialize_service(&config, log); | ||||
|         let service = Service::initialize_service(&config, log)?; | ||||
| 
 | ||||
|         // we have connected to a node and established its parameters. Spin up the core service
 | ||||
| 
 | ||||
| @ -190,10 +196,9 @@ impl Service { | ||||
|             .clock(Clock::system()) | ||||
|             .name_prefix("validator-client-") | ||||
|             .build() | ||||
|             .unwrap(); | ||||
|             .map_err(|e| format!("Tokio runtime failed: {}", e))?; | ||||
| 
 | ||||
|         // set up the validator work interval - start at next slot and proceed every slot
 | ||||
|         // TODO: Error chain handle errors.
 | ||||
|         let interval = { | ||||
|             // Set the interval to start at the next slot, and every slot after
 | ||||
|             let slot_duration = Duration::from_secs(config.spec.seconds_per_slot); | ||||
| @ -223,16 +228,23 @@ impl Service { | ||||
|             beacon_node: service.validator_client.clone(), | ||||
|         }; | ||||
| 
 | ||||
|         runtime.block_on(interval.for_each(move |_| { | ||||
|             // update duties
 | ||||
|             debug!( | ||||
|                 service.log, | ||||
|                 "Processing slot: {}", | ||||
|                 service.slot_clock.present_slot().unwrap().unwrap().as_u64() | ||||
|             ); | ||||
|             manager.poll(); | ||||
|             Ok(()) | ||||
|         })); | ||||
|         runtime | ||||
|             .block_on(interval.for_each(move |_| { | ||||
|                 // update duties
 | ||||
|                 let current_slot = match service.slot_clock.present_slot() { | ||||
|                     Err(e) => { | ||||
|                         error!(service.log, "SystemTimeError {:?}", e); | ||||
|                         return Ok(()); | ||||
|                     } | ||||
|                     Ok(slot) => slot.expect("Genesis is in the future"), | ||||
|                 }; | ||||
| 
 | ||||
|                 debug!(service.log, "Processing slot: {}", current_slot.as_u64()); | ||||
|                 manager.poll(); | ||||
|                 Ok(()) | ||||
|             })) | ||||
|             .map_err(|e| format!("Service thread failed: {:?}", e))?; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /* | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user