From 110e627d7b4b91bb55f5fdbfb54d38ccb313dce0 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 14 Sep 2019 10:34:03 -0400 Subject: [PATCH] Add basic, not-useful websocket server --- Cargo.toml | 1 + beacon_node/client/Cargo.toml | 1 + beacon_node/client/src/config.rs | 2 + beacon_node/client/src/lib.rs | 5 ++ beacon_node/websocket_server/Cargo.toml | 17 ++++++ beacon_node/websocket_server/src/lib.rs | 75 +++++++++++++++++++++++++ 6 files changed, 101 insertions(+) create mode 100644 beacon_node/websocket_server/Cargo.toml create mode 100644 beacon_node/websocket_server/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 3600c90ca..9b31060a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "beacon_node/rpc", "beacon_node/version", "beacon_node/beacon_chain", + "beacon_node/websocket_server", "tests/ef_tests", "lcli", "protos", diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 9b5a9cf42..383318b0d 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -10,6 +10,7 @@ network = { path = "../network" } eth2-libp2p = { path = "../eth2-libp2p" } rpc = { path = "../rpc" } rest_api = { path = "../rest_api" } +websocket_server = { path = "../websocket_server" } prometheus = "^0.6" types = { path = "../../eth2/types" } tree_hash = "0.1" diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 5b0553c5b..1e07e7cf2 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -27,6 +27,7 @@ pub struct Config { pub network: network::NetworkConfig, pub rpc: rpc::RPCConfig, pub rest_api: rest_api::ApiConfig, + pub websocket_server: websocket_server::Config, } /// Defines how the client should initialize a BeaconChain. @@ -96,6 +97,7 @@ impl Default for Config { network: NetworkConfig::new(), rpc: <_>::default(), rest_api: <_>::default(), + websocket_server: <_>::default(), spec_constants: TESTNET_SPEC_CONSTANTS.into(), beacon_chain_start_method: <_>::default(), eth1_backend_method: <_>::default(), diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index afcd538b5..08674166d 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -229,6 +229,11 @@ where None }; + // Start the websocket server + let _websocket_sender = if client_config.websocket_server.enabled { + websocket_server::start_server::(&client_config.websocket_server, &log)?; + }; + let (slot_timer_exit_signal, exit) = exit_future::signal(); if let Some(duration_to_next_slot) = beacon_chain.slot_clock.duration_to_next_slot() { // set up the validator work interval - start at next slot and proceed every slot diff --git a/beacon_node/websocket_server/Cargo.toml b/beacon_node/websocket_server/Cargo.toml new file mode 100644 index 000000000..f846f62b7 --- /dev/null +++ b/beacon_node/websocket_server/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "websocket_server" +version = "0.1.0" +authors = ["Paul Hauner "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +exit-future = "0.1.3" +futures = "0.1.25" +serde = "1.0" +serde_derive = "1.0" +slog = "^2.2.3" +tokio = "0.1.16" +types = { path = "../../eth2/types" } +ws = "0.9" diff --git a/beacon_node/websocket_server/src/lib.rs b/beacon_node/websocket_server/src/lib.rs new file mode 100644 index 000000000..eb28b10be --- /dev/null +++ b/beacon_node/websocket_server/src/lib.rs @@ -0,0 +1,75 @@ +use serde_derive::{Deserialize, Serialize}; +use slog::{error, info, Logger}; +use std::net::Ipv4Addr; +use std::thread; +use types::EthSpec; +use ws::{Sender, WebSocket}; + +/// The core configuration of a Lighthouse beacon node. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Config { + pub enabled: bool, + /// The IPv4 address the REST API HTTP server will listen on. + pub listen_address: Ipv4Addr, + /// The port the REST API HTTP server will listen on. + pub port: u16, +} + +impl Default for Config { + fn default() -> Self { + Config { + enabled: true, + listen_address: Ipv4Addr::new(127, 0, 0, 1), + port: 5053, + } + } +} + +pub struct WebSocketSender { + sender: Sender, +} + +impl WebSocketSender { + pub fn send_string(&self, string: String) -> Result<(), String> { + self.sender + .send(string) + .map_err(|e| format!("Unable to broadcast to websocket clients: {:?}", e)) + } +} + +pub fn start_server(config: &Config, log: &Logger) -> Result { + let server_string = format!("{}:{}", config.listen_address, config.port); + + info!( + log, + "Websocket server starting"; + "listen_address" => &server_string + ); + + // Create a server that simply ignores any incoming messages. + let server = WebSocket::new(|_| |_| Ok(())) + .map_err(|e| format!("Failed to initialize websocket server: {:?}", e))?; + + let broadcaster = server.broadcaster(); + + let log_inner = log.clone(); + let _handle = thread::spawn(move || match server.listen(server_string) { + Ok(_) => { + info!( + log_inner, + "Websocket server stopped"; + ); + } + Err(e) => { + error!( + log_inner, + "Websocket server failed to start"; + "error" => format!("{:?}", e) + ); + } + }); + + Ok(WebSocketSender { + sender: broadcaster, + }) +}