Thread beacon node RPC server

This commit is contained in:
Age Manning 2019-03-22 16:46:52 +11:00
parent 858cf4f1f4
commit a4cfe68272
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
4 changed files with 41 additions and 21 deletions

View File

@ -24,12 +24,8 @@ pub struct Client<T: ClientTypes> {
beacon_chain: Arc<BeaconChain<T::DB, T::SlotClock, T::ForkChoice>>, beacon_chain: Arc<BeaconChain<T::DB, T::SlotClock, T::ForkChoice>>,
/// Reference to the network service. /// Reference to the network service.
pub network: Arc<NetworkService>, pub network: Arc<NetworkService>,
/// Future to stop and begin shutdown of the Client. /// Signal to terminate the RPC server.
//TODO: Decide best way to handle shutdown pub rpc_exit_signal: Option<Signal>,
pub exit: exit_future::Exit,
/// The sending future to call to terminate the Client.
//TODO: Decide best way to handle shutdown
pub exit_signal: Signal,
/// The clients logger. /// The clients logger.
log: slog::Logger, log: slog::Logger,
/// Marker to pin the beacon chain generics. /// Marker to pin the beacon chain generics.
@ -43,8 +39,6 @@ impl<TClientType: ClientTypes> Client<TClientType> {
log: slog::Logger, log: slog::Logger,
executor: &TaskExecutor, executor: &TaskExecutor,
) -> error::Result<Self> { ) -> error::Result<Self> {
let (exit_signal, exit) = exit_future::signal();
// generate a beacon chain // generate a beacon chain
let beacon_chain = TClientType::initialise_beacon_chain(&config); let beacon_chain = TClientType::initialise_beacon_chain(&config);
@ -59,16 +53,23 @@ impl<TClientType: ClientTypes> Client<TClientType> {
network_logger, network_logger,
)?; )?;
let mut rpc_exit_signal = None;
// spawn the RPC server // spawn the RPC server
if config.rpc_conf.enabled { if config.rpc_conf.enabled {
rpc::start_server(&config.rpc_conf, beacon_chain.clone(), &log); rpc_exit_signal = Some(rpc::start_server(
&config.rpc_conf,
executor,
beacon_chain.clone(),
&log,
));
} }
println!("Here");
Ok(Client { Ok(Client {
config, config,
beacon_chain, beacon_chain,
exit, rpc_exit_signal,
exit_signal,
log, log,
network, network,
phantom: PhantomData, phantom: PhantomData,

View File

@ -21,3 +21,5 @@ futures = "0.1.23"
slog = "^2.2.3" slog = "^2.2.3"
slog-term = "^2.4.0" slog-term = "^2.4.0"
slog-async = "^2.3.0" slog-async = "^2.3.0"
tokio = "0.1.17"
exit-future = "0.1.4"

View File

@ -9,24 +9,28 @@ use self::beacon_chain::BeaconChain;
use self::beacon_node::BeaconNodeServiceInstance; use self::beacon_node::BeaconNodeServiceInstance;
use self::validator::ValidatorServiceInstance; use self::validator::ValidatorServiceInstance;
pub use config::Config as RPCConfig; pub use config::Config as RPCConfig;
use futures::{future, Future};
use grpcio::{Environment, Server, ServerBuilder}; use grpcio::{Environment, Server, ServerBuilder};
use protos::services_grpc::{ use protos::services_grpc::{
create_beacon_block_service, create_beacon_node_service, create_validator_service, create_beacon_block_service, create_beacon_node_service, create_validator_service,
}; };
use slog::{info, o, warn};
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::TaskExecutor;
use slog::{info, o};
pub fn start_server( pub fn start_server(
config: &RPCConfig, config: &RPCConfig,
executor: &TaskExecutor,
beacon_chain: Arc<BeaconChain>, beacon_chain: Arc<BeaconChain>,
log: &slog::Logger, log: &slog::Logger,
) -> Server { ) -> exit_future::Signal {
let log = log.new(o!("Service"=>"RPC")); let log = log.new(o!("Service"=>"RPC"));
let env = Arc::new(Environment::new(1)); let env = Arc::new(Environment::new(1));
// build the individual rpc services // build a channel to kill the rpc server
let (rpc_exit_signal, rpc_exit) = exit_future::signal();
// build the individual rpc services
let beacon_node_service = { let beacon_node_service = {
let instance = BeaconNodeServiceInstance { let instance = BeaconNodeServiceInstance {
chain: beacon_chain.clone(), chain: beacon_chain.clone(),
@ -50,9 +54,22 @@ pub fn start_server(
.bind(config.listen_address.to_string(), config.port) .bind(config.listen_address.to_string(), config.port)
.build() .build()
.unwrap(); .unwrap();
server.start();
for &(ref host, port) in server.bind_addrs() { let spawn_rpc = {
info!(log, "gRPC listening on {}:{}", host, port); server.start();
} for &(ref host, port) in server.bind_addrs() {
server info!(log, "gRPC listening on {}:{}", host, port);
}
rpc_exit.and_then(move |_| {
info!(log, "RPC Server shutting down");
server
.shutdown()
.wait()
.map(|_| ())
.map_err(|e| warn!(log, "RPC server failed to shutdown: {:?}", e))?;
Ok(())
})
};
executor.spawn(spawn_rpc);
rpc_exit_signal
} }

View File

@ -21,7 +21,7 @@ impl ClientConfig {
}; };
fs::create_dir_all(&data_dir) fs::create_dir_all(&data_dir)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir)); .unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir));
let server = "localhost:50051".to_string(); let server = "localhost:5051".to_string();
let spec = ChainSpec::foundation(); let spec = ChainSpec::foundation();
Self { Self {
data_dir, data_dir,