diff --git a/Cargo.lock b/Cargo.lock index d0b63d902..e6f8d96f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,7 +31,7 @@ dependencies = [ "slog-term", "slot_clock", "tempfile", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-compat-02", "types", "validator_dir", @@ -454,12 +454,6 @@ dependencies = [ "autocfg 1.0.1", ] -[[package]] -name = "atomic-option" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0db678acb667b525ac40a324fc5f7d3390e29239b31c7327bb8157f5b4fff593" - [[package]] name = "atomic-waker" version = "1.0.0" @@ -559,7 +553,6 @@ version = "0.2.0" dependencies = [ "bitvec 0.19.4", "bls", - "bus", "derivative", "environment", "eth1", @@ -603,10 +596,9 @@ dependencies = [ "store", "task_executor", "tempfile", - "tokio 0.3.4", + "tokio 0.3.5", "tree_hash", "types", - "websocket_server", ] [[package]] @@ -641,7 +633,7 @@ dependencies = [ "slog-term", "store", "task_executor", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-compat-02", "types", ] @@ -859,7 +851,7 @@ dependencies = [ "slog-stdlog", "slog-term", "sloggers", - "tokio 0.3.4", + "tokio 0.3.5", "types", ] @@ -903,18 +895,6 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820" -[[package]] -name = "bus" -version = "2.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1e66e1779f5b1440f1a58220ba3b3ded4427175f0a9fb8d7066521f8b4e8f2b" -dependencies = [ - "atomic-option", - "crossbeam-channel 0.4.4", - "num_cpus", - "parking_lot_core 0.7.2", -] - [[package]] name = "byte-slice-cast" version = "0.3.5" @@ -1102,7 +1082,6 @@ name = "client" version = "0.2.0" dependencies = [ "beacon_chain", - "bus", "directory", "dirs 3.0.1", "environment", @@ -1133,12 +1112,11 @@ dependencies = [ "task_executor", "time 0.2.23", "timer", - "tokio 0.3.4", + "tokio 0.3.5", "toml", "tree_hash", "types", "url 2.2.0", - "websocket_server", ] [[package]] @@ -1733,7 +1711,7 @@ dependencies = [ "rlp", "sha2 0.9.2", "smallvec 1.5.0", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-util 0.5.0", "tracing", "tracing-subscriber", @@ -1765,7 +1743,7 @@ dependencies = [ "rlp", "sha2 0.9.2", "smallvec 1.5.0", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-util 0.4.0", "tracing", "tracing-subscriber", @@ -1938,7 +1916,7 @@ dependencies = [ "slog-term", "sloggers", "task_executor", - "tokio 0.3.4", + "tokio 0.3.5", "types", ] @@ -1977,7 +1955,7 @@ dependencies = [ "sloggers", "state_processing", "task_executor", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-compat-02", "toml", "tree_hash", @@ -1992,7 +1970,7 @@ dependencies = [ "deposit_contract", "futures 0.3.8", "serde_json", - "tokio 0.3.4", + "tokio 0.3.5", "types", "web3", ] @@ -2007,6 +1985,8 @@ dependencies = [ "eth2_libp2p", "eth2_ssz", "eth2_ssz_derive", + "futures 0.3.8", + "futures-util", "hex", "libsecp256k1", "procinfo", @@ -2128,7 +2108,7 @@ dependencies = [ "task_executor", "tempdir", "tiny-keccak 2.0.2", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-io-timeout", "tokio-util 0.4.0", "types", @@ -2646,7 +2626,7 @@ dependencies = [ "serde_derive", "slog", "state_processing", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-compat-02", "tree_hash", "types", @@ -2781,7 +2761,7 @@ dependencies = [ "http 0.2.1 (git+https://github.com/agemanning/http?branch=lighthouse)", "indexmap", "slab 0.4.2", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-util 0.5.0", "tracing", "tracing-futures", @@ -2816,7 +2796,7 @@ name = "hashset_delay" version = "0.2.0" dependencies = [ "futures 0.3.8", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-util 0.4.0", ] @@ -3045,6 +3025,7 @@ dependencies = [ "eth2_libp2p", "eth2_ssz", "fork_choice", + "futures 0.3.8", "hex", "lazy_static", "lighthouse_metrics", @@ -3056,7 +3037,7 @@ dependencies = [ "slot_clock", "state_processing", "store", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-compat-02", "tree_hash", "types", @@ -3081,7 +3062,7 @@ dependencies = [ "slog", "slot_clock", "store", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-compat-02", "types", "warp", @@ -3226,7 +3207,7 @@ dependencies = [ "itoa", "pin-project 1.0.2", "socket2", - "tokio 0.3.4", + "tokio 0.3.5", "tower-service", "tracing", "want 0.3.0", @@ -3556,12 +3537,6 @@ dependencies = [ "spin", ] -[[package]] -name = "lazycell" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" - [[package]] name = "lcli" version = "1.0.3" @@ -3588,7 +3563,7 @@ dependencies = [ "serde_yaml", "simple_logger", "state_processing", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-compat-02", "tree_hash", "types", @@ -3882,7 +3857,7 @@ dependencies = [ "libp2p-core 0.25.0", "log 0.4.11", "socket2", - "tokio 0.3.4", + "tokio 0.3.5", ] [[package]] @@ -3980,7 +3955,7 @@ dependencies = [ "slog-term", "sloggers", "tempfile", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-compat-02", "types", "validator_client", @@ -4250,9 +4225,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.6.22" +version = "0.6.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430" +checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" dependencies = [ "cfg-if 0.1.10", "fuchsia-zircon", @@ -4280,18 +4255,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "mio-extras" -version = "2.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" -dependencies = [ - "lazycell", - "log 0.4.11", - "mio 0.6.22", - "slab 0.4.2", -] - [[package]] name = "mio-named-pipes" version = "0.1.7" @@ -4299,7 +4262,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" dependencies = [ "log 0.4.11", - "mio 0.6.22", + "mio 0.6.23", "miow 0.3.6", "winapi 0.3.9", ] @@ -4312,7 +4275,7 @@ checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" dependencies = [ "iovec", "libc", - "mio 0.6.22", + "mio 0.6.23", ] [[package]] @@ -4505,7 +4468,7 @@ dependencies = [ "store", "task_executor", "tempfile", - "tokio 0.3.4", + "tokio 0.3.5", "tree_hash", "types", ] @@ -6208,7 +6171,7 @@ dependencies = [ "node_test_rig", "parking_lot 0.11.1", "rayon", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-compat-02", "types", "validator_client", @@ -6720,7 +6683,7 @@ dependencies = [ "lazy_static", "lighthouse_metrics", "slog", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-compat-02", ] @@ -6892,7 +6855,7 @@ dependencies = [ "slog", "slot_clock", "task_executor", - "tokio 0.3.4", + "tokio 0.3.5", "types", ] @@ -6965,7 +6928,7 @@ checksum = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6" dependencies = [ "bytes 0.4.12", "futures 0.1.30", - "mio 0.6.22", + "mio 0.6.23", "num_cpus", "tokio-codec", "tokio-current-thread", @@ -6994,7 +6957,7 @@ dependencies = [ "lazy_static", "libc", "memchr", - "mio 0.6.22", + "mio 0.6.23", "mio-named-pipes", "mio-uds", "num_cpus", @@ -7007,9 +6970,9 @@ dependencies = [ [[package]] name = "tokio" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dfe2523e6fa84ddf5e688151d4e5fddc51678de9752c6512a24714c23818d61" +checksum = "a12a3eb39ee2c231be64487f1fcbe726c8f2514876a55480a5ab8559fc374252" dependencies = [ "autocfg 1.0.1", "bytes 0.6.0", @@ -7059,7 +7022,7 @@ dependencies = [ "once_cell", "pin-project-lite 0.1.11", "tokio 0.2.23", - "tokio 0.3.4", + "tokio 0.3.5", ] [[package]] @@ -7072,7 +7035,7 @@ dependencies = [ "futures 0.1.30", "iovec", "log 0.4.11", - "mio 0.6.22", + "mio 0.6.23", "scoped-tls 0.1.2", "tokio 0.1.22", "tokio-executor", @@ -7129,7 +7092,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6654a6da4326b0b4228000891d44fbcbdaa1904c6ddfa06617230649073be8fb" dependencies = [ - "tokio 0.3.4", + "tokio 0.3.5", ] [[package]] @@ -7164,7 +7127,7 @@ dependencies = [ "futures 0.1.30", "lazy_static", "log 0.4.11", - "mio 0.6.22", + "mio 0.6.23", "num_cpus", "parking_lot 0.9.0", "slab 0.4.2", @@ -7192,7 +7155,7 @@ dependencies = [ "bytes 0.4.12", "futures 0.1.30", "iovec", - "mio 0.6.22", + "mio 0.6.23", "tokio-io", "tokio-reactor", ] @@ -7266,7 +7229,7 @@ dependencies = [ "futures-util", "log 0.4.11", "pin-project 1.0.2", - "tokio 0.3.4", + "tokio 0.3.5", "tungstenite", ] @@ -7279,7 +7242,7 @@ dependencies = [ "bytes 0.4.12", "futures 0.1.30", "log 0.4.11", - "mio 0.6.22", + "mio 0.6.23", "tokio-codec", "tokio-io", "tokio-reactor", @@ -7296,7 +7259,7 @@ dependencies = [ "iovec", "libc", "log 0.3.9", - "mio 0.6.22", + "mio 0.6.23", "mio-uds", "tokio-core", "tokio-io", @@ -7313,7 +7276,7 @@ dependencies = [ "iovec", "libc", "log 0.4.11", - "mio 0.6.22", + "mio 0.6.23", "mio-uds", "tokio-codec", "tokio-io", @@ -7347,7 +7310,7 @@ dependencies = [ "log 0.4.11", "pin-project-lite 0.1.11", "slab 0.4.2", - "tokio 0.3.4", + "tokio 0.3.5", ] [[package]] @@ -7362,7 +7325,7 @@ dependencies = [ "log 0.4.11", "pin-project-lite 0.1.11", "slab 0.4.2", - "tokio 0.3.4", + "tokio 0.3.5", ] [[package]] @@ -7794,7 +7757,7 @@ dependencies = [ "slot_clock", "tempdir", "tempfile", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-compat-02", "tree_hash", "types", @@ -7915,7 +7878,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "tokio 0.3.4", + "tokio 0.3.5", "tokio-tungstenite", "tower-service", "tracing", @@ -7934,7 +7897,7 @@ dependencies = [ "safe_arith", "serde", "state_processing", - "tokio 0.3.4", + "tokio 0.3.5", "types", "warp", ] @@ -8142,20 +8105,6 @@ dependencies = [ "url 1.7.2", ] -[[package]] -name = "websocket_server" -version = "0.2.0" -dependencies = [ - "futures 0.3.8", - "serde", - "serde_derive", - "slog", - "task_executor", - "tokio 0.3.4", - "types", - "ws", -] - [[package]] name = "wepoll-sys" version = "3.0.1" @@ -8226,24 +8175,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "ws" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c51a2c47b5798ccc774ffb93ff536aec7c4275d722fd9c740c83cdd1af1f2d94" -dependencies = [ - "byteorder", - "bytes 0.4.12", - "httparse", - "log 0.4.11", - "mio 0.6.22", - "mio-extras", - "rand 0.7.3", - "sha-1 0.8.2", - "slab 0.4.2", - "url 2.2.0", -] - [[package]] name = "ws2_32-sys" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 3d2973261..ec7b055b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,6 @@ members = [ "beacon_node/network", "beacon_node/store", "beacon_node/timer", - "beacon_node/websocket_server", "boot_node", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index dd3c93812..5ab0427b8 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -42,7 +42,6 @@ tree_hash = "0.1.1" types = { path = "../../consensus/types" } tokio = "0.3.2" eth1 = { path = "../eth1" } -websocket_server = { path = "../websocket_server" } futures = "0.3.7" genesis = { path = "../genesis" } integer-sqrt = "0.1.5" @@ -56,7 +55,6 @@ bls = { path = "../../crypto/bls" } safe_arith = { path = "../../consensus/safe_arith" } fork_choice = { path = "../../consensus/fork_choice" } task_executor = { path = "../../common/task_executor" } -bus = "2.2.3" derivative = "2.1.1" itertools = "0.9.0" regex = "1.3.9" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index aa744ad35..7cd90ad4e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -10,7 +10,7 @@ use crate::block_verification::{ use crate::chain_config::ChainConfig; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; -use crate::events::{EventHandler, EventKind}; +use crate::events::ServerSentEventHandler; use crate::head_tracker::HeadTracker; use crate::migrate::BackgroundMigrator; use crate::naive_aggregation_pool::{Error as NaiveAggregationError, NaiveAggregationPool}; @@ -27,6 +27,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::BeaconForkChoiceStore; use crate::BeaconSnapshot; use crate::{metrics, BeaconChainError}; +use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead}; use fork_choice::ForkChoice; use futures::channel::mpsc::Sender; use itertools::process_results; @@ -157,7 +158,6 @@ pub trait BeaconChainTypes: Send + Sync + 'static { type SlotClock: slot_clock::SlotClock; type Eth1Chain: Eth1ChainBackend; type EthSpec: types::EthSpec; - type EventHandler: EventHandler; } /// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block @@ -214,8 +214,9 @@ pub struct BeaconChain { pub fork_choice: RwLock< ForkChoice, T::EthSpec>, >, - /// A handler for events generated by the beacon chain. - pub event_handler: T::EventHandler, + /// A handler for events generated by the beacon chain. This is only initialized when the + /// HTTP server is enabled. + pub event_handler: Option>, /// Used to track the heads of the beacon chain. pub(crate) head_tracker: Arc, /// A cache dedicated to block processing. @@ -955,17 +956,25 @@ impl BeaconChain { /// aggregation bit set. pub fn verify_unaggregated_attestation_for_gossip( &self, - attestation: Attestation, + unaggregated_attestation: Attestation, subnet_id: Option, ) -> Result, AttestationError> { metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); - VerifiedUnaggregatedAttestation::verify(attestation, subnet_id, self).map(|v| { - metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES); - v - }) + VerifiedUnaggregatedAttestation::verify(unaggregated_attestation, subnet_id, self).map( + |v| { + // This method is called for API and gossip attestations, so this covers all unaggregated attestation events + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_attestation_subscribers() { + event_handler.register(EventKind::Attestation(v.attestation().clone())); + } + } + metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES); + v + }, + ) } /// Accepts some `SignedAggregateAndProof` from the network and attempts to verify it, @@ -979,6 +988,12 @@ impl BeaconChain { metrics::start_timer(&metrics::AGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); VerifiedAggregatedAttestation::verify(signed_aggregate, self).map(|v| { + // This method is called for API and gossip attestations, so this covers all aggregated attestation events + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_attestation_subscribers() { + event_handler.register(EventKind::Attestation(v.attestation().clone())); + } + } metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_PROCESSING_SUCCESSES); v }) @@ -1222,11 +1237,21 @@ impl BeaconChain { ) -> Result, Error> { // NOTE: this could be more efficient if it avoided cloning the head state let wall_clock_state = self.wall_clock_state()?; - Ok(self.observed_voluntary_exits.lock().verify_and_observe( - exit, - &wall_clock_state, - &self.spec, - )?) + Ok(self + .observed_voluntary_exits + .lock() + .verify_and_observe(exit, &wall_clock_state, &self.spec) + .map(|exit| { + // this method is called for both API and gossip exits, so this covers all exit events + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_exit_subscribers() { + if let ObservationOutcome::New(exit) = exit.clone() { + event_handler.register(EventKind::VoluntaryExit(exit.into_inner())); + } + } + } + exit + })?) } /// Accept a pre-verified exit and queue it for inclusion in an appropriate block. @@ -1510,11 +1535,6 @@ impl BeaconChain { // Increment the Prometheus counter for block processing successes. metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); - let _ = self.event_handler.register(EventKind::BeaconBlockImported { - block_root, - block: Box::new(block), - }); - Ok(block_root) } // There was an error whilst attempting to verify and import the block. The block might @@ -1525,12 +1545,6 @@ impl BeaconChain { "Beacon block processing error"; "error" => format!("{:?}", e), ); - - let _ = self.event_handler.register(EventKind::BeaconBlockRejected { - reason: format!("Internal error: {:?}", e), - block: Box::new(block), - }); - Err(BlockError::BeaconChainError(e)) } // The block failed verification. @@ -1540,12 +1554,6 @@ impl BeaconChain { "Beacon block rejected"; "reason" => other.to_string(), ); - - let _ = self.event_handler.register(EventKind::BeaconBlockRejected { - reason: format!("Invalid block: {}", other), - block: Box::new(block), - }); - Err(other) } } @@ -1664,7 +1672,7 @@ impl BeaconChain { ); crit!(self.log, "You must use the `--purge-db` flag to clear the database and restart sync. You may be on a hostile network."); shutdown_sender.try_send("Weak subjectivity checkpoint verification failed. Provided block root is not a checkpoint.") - .map_err(|err|BlockError::BeaconChainError(BeaconChainError::WeakSubjectivtyShutdownError(err)))?; + .map_err(|err| BlockError::BeaconChainError(BeaconChainError::WeakSubjectivtyShutdownError(err)))?; return Err(BlockError::WeakSubjectivityConflict); } } @@ -1745,6 +1753,16 @@ impl BeaconChain { self.head_tracker .register_block(block_root, parent_root, slot); + // send an event to the `events` endpoint after fully processing the block + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_block_subscribers() { + event_handler.register(EventKind::Block(SseBlock { + slot, + block: block_root, + })); + } + } + metrics::stop_timer(db_write_timer); metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); @@ -1993,7 +2011,7 @@ impl BeaconChain { "previous_slot" => current_head.slot, "new_head_parent" => format!("{}", new_head.beacon_block.parent_root()), "new_head" => format!("{}", beacon_block_root), - "new_slot" => new_head.beacon_block.slot() + "new_slot" => new_head.beacon_block.slot(), ); } else { debug!( @@ -2018,13 +2036,13 @@ impl BeaconChain { }); } - if current_head.slot.epoch(T::EthSpec::slots_per_epoch()) + let is_epoch_transition = current_head.slot.epoch(T::EthSpec::slots_per_epoch()) < new_head .beacon_state .slot - .epoch(T::EthSpec::slots_per_epoch()) - || is_reorg - { + .epoch(T::EthSpec::slots_per_epoch()); + + if is_epoch_transition || is_reorg { self.persist_head_and_fork_choice()?; self.op_pool.prune_attestations(self.epoch()?); self.ingest_slashings_to_op_pool(&new_head.beacon_state); @@ -2033,6 +2051,18 @@ impl BeaconChain { let update_head_timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES); + // These fields are used for server-sent events + let state_root = new_head.beacon_state_root; + let head_slot = new_head.beacon_state.slot; + let target_epoch_start_slot = new_head + .beacon_state + .current_epoch() + .start_slot(T::EthSpec::slots_per_epoch()); + let prev_target_epoch_start_slot = new_head + .beacon_state + .previous_epoch() + .start_slot(T::EthSpec::slots_per_epoch()); + // Update the snapshot that stores the head of the chain at the time it received the // block. *self @@ -2092,11 +2122,37 @@ impl BeaconChain { self.after_finalization(&head.beacon_state, new_finalized_state_root)?; } - let _ = self.event_handler.register(EventKind::BeaconHeadChanged { - reorg: is_reorg, - previous_head_beacon_block_root: current_head.block_root, - current_head_beacon_block_root: beacon_block_root, - }); + // Register a server-sent event if necessary + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_head_subscribers() { + if let Ok(Some(current_duty_dependent_root)) = + self.root_at_slot(target_epoch_start_slot - 1) + { + if let Ok(Some(previous_duty_dependent_root)) = + self.root_at_slot(prev_target_epoch_start_slot - 1) + { + event_handler.register(EventKind::Head(SseHead { + slot: head_slot, + block: beacon_block_root, + state: state_root, + current_duty_dependent_root, + previous_duty_dependent_root, + epoch_transition: is_epoch_transition, + })); + } else { + warn!( + self.log, + "Unable to find previous target root, cannot register head event" + ); + } + } else { + warn!( + self.log, + "Unable to find current target root, cannot register head event" + ); + } + } + } Ok(()) } @@ -2204,10 +2260,15 @@ impl BeaconChain { self.head_tracker.clone(), )?; - let _ = self.event_handler.register(EventKind::BeaconFinalization { - epoch: new_finalized_checkpoint.epoch, - root: new_finalized_checkpoint.root, - }); + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_finalized_subscribers() { + event_handler.register(EventKind::FinalizedCheckpoint(SseFinalizedCheckpoint { + epoch: new_finalized_checkpoint.epoch, + block: new_finalized_checkpoint.root, + state: new_finalized_state_root, + })); + } + } Ok(()) } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 0311755df..5276d1152 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -2,7 +2,6 @@ use crate::beacon_chain::{ BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY, }; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; -use crate::events::NullEventHandler; use crate::head_tracker::HeadTracker; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::persisted_beacon_chain::PersistedBeaconChain; @@ -14,7 +13,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::ChainConfig; use crate::{ BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, BeaconSnapshot, Eth1Chain, - Eth1ChainBackend, EventHandler, + Eth1ChainBackend, ServerSentEventHandler, }; use eth1::Config as Eth1Config; use fork_choice::ForkChoice; @@ -38,33 +37,24 @@ pub const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz"; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing /// functionality and only exists to satisfy the type system. -pub struct Witness( - PhantomData<( - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - )>, +pub struct Witness( + PhantomData<(TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore)>, ); -impl BeaconChainTypes - for Witness +impl BeaconChainTypes + for Witness where THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, TSlotClock: SlotClock + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, - TEventHandler: EventHandler + 'static, { type HotStore = THotStore; type ColdStore = TColdStore; type SlotClock = TSlotClock; type Eth1Chain = TEth1Backend; type EthSpec = TEthSpec; - type EventHandler = TEventHandler; } /// Builds a `BeaconChain` by either creating anew from genesis, or, resuming from an existing chain @@ -88,7 +78,7 @@ pub struct BeaconChainBuilder { >, op_pool: Option>, eth1_chain: Option>, - event_handler: Option, + event_handler: Option>, slot_clock: Option, shutdown_sender: Option>, head_tracker: Option, @@ -103,17 +93,14 @@ pub struct BeaconChainBuilder { slasher: Option>>, } -impl - BeaconChainBuilder< - Witness, - > +impl + BeaconChainBuilder> where THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, TSlotClock: SlotClock + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, - TEventHandler: EventHandler + 'static, { /// Returns a new builder. /// @@ -377,9 +364,9 @@ where /// Sets the `BeaconChain` event handler backend. /// - /// For example, provide `WebSocketSender` as a `handler`. - pub fn event_handler(mut self, handler: TEventHandler) -> Self { - self.event_handler = Some(handler); + /// For example, provide `ServerSentEventHandler` as a `handler`. + pub fn event_handler(mut self, handler: Option>) -> Self { + self.event_handler = handler; self } @@ -425,9 +412,7 @@ where pub fn build( self, ) -> Result< - BeaconChain< - Witness, - >, + BeaconChain>, String, > { let log = self.log.ok_or("Cannot build without a logger")?; @@ -549,9 +534,7 @@ where genesis_block_root, genesis_state_root, fork_choice: RwLock::new(fork_choice), - event_handler: self - .event_handler - .ok_or("Cannot build without an event handler")?, + event_handler: self.event_handler, head_tracker: Arc::new(self.head_tracker.unwrap_or_default()), snapshot_cache: TimeoutRwLock::new(SnapshotCache::new( DEFAULT_SNAPSHOT_CACHE_SIZE, @@ -605,23 +588,15 @@ where } } -impl +impl BeaconChainBuilder< - Witness< - TSlotClock, - CachingEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - >, + Witness, TEthSpec, THotStore, TColdStore>, > where THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, TSlotClock: SlotClock + 'static, TEthSpec: EthSpec + 'static, - TEventHandler: EventHandler + 'static, { /// Do not use any eth1 backend. The client will not be able to produce beacon blocks. pub fn no_eth1_backend(self) -> Self { @@ -644,16 +619,13 @@ where } } -impl - BeaconChainBuilder< - Witness, - > +impl + BeaconChainBuilder> where THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, - TEventHandler: EventHandler + 'static, { /// Sets the `BeaconChain` slot clock to `TestingSlotClock`. /// @@ -673,31 +645,6 @@ where } } -impl - BeaconChainBuilder< - Witness< - TSlotClock, - TEth1Backend, - TEthSpec, - NullEventHandler, - THotStore, - TColdStore, - >, - > -where - THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, - TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, - TEthSpec: EthSpec + 'static, -{ - /// Sets the `BeaconChain` event handler to `NullEventHandler`. - pub fn null_event_handler(self) -> Self { - let handler = NullEventHandler::default(); - self.event_handler(handler) - } -} - fn genesis_block( genesis_state: &mut BeaconState, spec: &ChainSpec, @@ -767,7 +714,6 @@ mod test { .expect("should build state using recent genesis") .dummy_eth1_backend() .expect("should build the dummy eth1 backend") - .null_event_handler() .testing_slot_clock(Duration::from_secs(1)) .expect("should configure testing slot clock") .shutdown_sender(shutdown_tx) diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index f673813e6..b679b9977 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -1,147 +1,113 @@ -use bus::Bus; -use parking_lot::Mutex; -use serde_derive::{Deserialize, Serialize}; -use slog::{error, Logger}; -use std::marker::PhantomData; -use std::sync::Arc; -use types::{Attestation, Epoch, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockHash}; -pub use websocket_server::WebSocketSender; +pub use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead}; +use slog::{trace, Logger}; +use tokio::sync::broadcast; +use tokio::sync::broadcast::{error::SendError, Receiver, Sender}; +use types::EthSpec; -pub trait EventHandler: Sized + Send + Sync { - fn register(&self, kind: EventKind) -> Result<(), String>; -} +const DEFAULT_CHANNEL_CAPACITY: usize = 16; -pub struct NullEventHandler(PhantomData); - -impl EventHandler for WebSocketSender { - fn register(&self, kind: EventKind) -> Result<(), String> { - self.send_string( - serde_json::to_string(&kind) - .map_err(|e| format!("Unable to serialize event: {:?}", e))?, - ) - } -} - -pub struct ServerSentEvents { - // Bus<> is itself Sync + Send. We use Mutex<> here only because of the surrounding code does - // not enforce mutability statically (i.e. relies on interior mutability). - head_changed_queue: Arc>>, +pub struct ServerSentEventHandler { + attestation_tx: Sender>, + block_tx: Sender>, + finalized_tx: Sender>, + head_tx: Sender>, + exit_tx: Sender>, log: Logger, - _phantom: PhantomData, } -impl ServerSentEvents { - pub fn new(log: Logger) -> (Self, Arc>>) { - let bus = Bus::new(T::slots_per_epoch() as usize); - let mutex = Mutex::new(bus); - let arc = Arc::new(mutex); - let this = Self { - head_changed_queue: arc.clone(), +impl ServerSentEventHandler { + pub fn new(log: Logger) -> Self { + let (attestation_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); + let (block_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); + let (finalized_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); + let (head_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); + let (exit_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); + + Self { + attestation_tx, + block_tx, + finalized_tx, + head_tx, + exit_tx, log, - _phantom: PhantomData, - }; - (this, arc) - } -} - -impl EventHandler for ServerSentEvents { - fn register(&self, kind: EventKind) -> Result<(), String> { - match kind { - EventKind::BeaconHeadChanged { - current_head_beacon_block_root, - .. - } => { - let mut guard = self.head_changed_queue.lock(); - if guard - .try_broadcast(current_head_beacon_block_root.into()) - .is_err() - { - error!( - self.log, - "Head change streaming queue full"; - "dropped_change" => format!("{}", current_head_beacon_block_root), - ); - } - Ok(()) - } - _ => Ok(()), } } -} -// An event handler that pushes events to both the websockets handler and the SSE handler. -// Named after the unix `tee` command. Meant as a temporary solution before ditching WebSockets -// completely once SSE functions well enough. -pub struct TeeEventHandler { - websockets_handler: WebSocketSender, - sse_handler: ServerSentEvents, -} + pub fn new_with_capacity(log: Logger, capacity: usize) -> Self { + let (attestation_tx, _) = broadcast::channel(capacity); + let (block_tx, _) = broadcast::channel(capacity); + let (finalized_tx, _) = broadcast::channel(capacity); + let (head_tx, _) = broadcast::channel(capacity); + let (exit_tx, _) = broadcast::channel(capacity); -impl TeeEventHandler { - #[allow(clippy::type_complexity)] - pub fn new( - log: Logger, - websockets_handler: WebSocketSender, - ) -> Result<(Self, Arc>>), String> { - let (sse_handler, bus) = ServerSentEvents::new(log); - let result = Self { - websockets_handler, - sse_handler, + Self { + attestation_tx, + block_tx, + finalized_tx, + head_tx, + exit_tx, + log, + } + } + + pub fn register(&self, kind: EventKind) { + let result = match kind { + EventKind::Attestation(attestation) => self + .attestation_tx + .send(EventKind::Attestation(attestation)) + .map(|count| trace!(self.log, "Registering server-sent attestation event"; "receiver_count" => count)), + EventKind::Block(block) => self.block_tx.send(EventKind::Block(block)) + .map(|count| trace!(self.log, "Registering server-sent block event"; "receiver_count" => count)), + EventKind::FinalizedCheckpoint(checkpoint) => self.finalized_tx + .send(EventKind::FinalizedCheckpoint(checkpoint)) + .map(|count| trace!(self.log, "Registering server-sent finalized checkpoint event"; "receiver_count" => count)), + EventKind::Head(head) => self.head_tx.send(EventKind::Head(head)) + .map(|count| trace!(self.log, "Registering server-sent head event"; "receiver_count" => count)), + EventKind::VoluntaryExit(exit) => self.exit_tx.send(EventKind::VoluntaryExit(exit)) + .map(|count| trace!(self.log, "Registering server-sent voluntary exit event"; "receiver_count" => count)), }; - Ok((result, bus)) + if let Err(SendError(event)) = result { + trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); + } + } + + pub fn subscribe_attestation(&self) -> Receiver> { + self.attestation_tx.subscribe() + } + + pub fn subscribe_block(&self) -> Receiver> { + self.block_tx.subscribe() + } + + pub fn subscribe_finalized(&self) -> Receiver> { + self.finalized_tx.subscribe() + } + + pub fn subscribe_head(&self) -> Receiver> { + self.head_tx.subscribe() + } + + pub fn subscribe_exit(&self) -> Receiver> { + self.exit_tx.subscribe() + } + + pub fn has_attestation_subscribers(&self) -> bool { + self.attestation_tx.receiver_count() > 0 + } + + pub fn has_block_subscribers(&self) -> bool { + self.block_tx.receiver_count() > 0 + } + + pub fn has_finalized_subscribers(&self) -> bool { + self.finalized_tx.receiver_count() > 0 + } + + pub fn has_head_subscribers(&self) -> bool { + self.head_tx.receiver_count() > 0 + } + + pub fn has_exit_subscribers(&self) -> bool { + self.exit_tx.receiver_count() > 0 } } - -impl EventHandler for TeeEventHandler { - fn register(&self, kind: EventKind) -> Result<(), String> { - self.websockets_handler.register(kind.clone())?; - self.sse_handler.register(kind)?; - Ok(()) - } -} - -impl EventHandler for NullEventHandler { - fn register(&self, _kind: EventKind) -> Result<(), String> { - Ok(()) - } -} - -impl Default for NullEventHandler { - fn default() -> Self { - NullEventHandler(PhantomData) - } -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde( - bound = "T: EthSpec", - rename_all = "snake_case", - tag = "event", - content = "data" -)] -pub enum EventKind { - BeaconHeadChanged { - reorg: bool, - current_head_beacon_block_root: Hash256, - previous_head_beacon_block_root: Hash256, - }, - BeaconFinalization { - epoch: Epoch, - root: Hash256, - }, - BeaconBlockImported { - block_root: Hash256, - block: Box>, - }, - BeaconBlockRejected { - reason: String, - block: Box>, - }, - BeaconAttestationImported { - attestation: Box>, - }, - BeaconAttestationRejected { - reason: String, - attestation: Box>, - }, -} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 37704a94e..b4a3ab040 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -36,7 +36,7 @@ pub use attestation_verification::Error as AttestationError; pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError}; pub use block_verification::{BlockError, GossipVerifiedBlock}; pub use eth1_chain::{Eth1Chain, Eth1ChainBackend}; -pub use events::EventHandler; +pub use events::ServerSentEventHandler; pub use metrics::scrape_for_metrics; pub use parking_lot; pub use slot_clock; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 00c4f5412..a2794335b 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -7,8 +7,8 @@ pub use crate::{ use crate::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::CachingEth1Backend, - events::NullEventHandler, - BeaconChain, BeaconChainTypes, BlockError, ChainConfig, StateSkipConfig, + BeaconChain, BeaconChainTypes, BlockError, ChainConfig, ServerSentEventHandler, + StateSkipConfig, }; use futures::channel::mpsc::Receiver; use genesis::interop_genesis_state; @@ -42,14 +42,8 @@ pub const HARNESS_GENESIS_TIME: u64 = 1_567_552_690; // This parameter is required by a builder but not used because we use the `TestingSlotClock`. pub const HARNESS_SLOT_TIME: Duration = Duration::from_secs(1); -pub type BaseHarnessType = Witness< - TestingSlotClock, - CachingEth1Backend, - TEthSpec, - NullEventHandler, - THotStore, - TColdStore, ->; +pub type BaseHarnessType = + Witness, TEthSpec, THotStore, TColdStore>; pub type DiskHarnessType = BaseHarnessType, LevelDB>; pub type EphemeralHarnessType = BaseHarnessType, MemoryStore>; @@ -188,7 +182,7 @@ impl BeaconChainHarness> { let store = HotColdDB::open_ephemeral(store_config, spec.clone(), log.clone()).unwrap(); let chain = BeaconChainBuilder::new(eth_spec_instance) - .logger(log) + .logger(log.clone()) .custom_spec(spec.clone()) .store(Arc::new(store)) .store_migrator_config(MigratorConfig::default().blocking()) @@ -200,11 +194,11 @@ impl BeaconChainHarness> { .expect("should build state using recent genesis") .dummy_eth1_backend() .expect("should build dummy backend") - .null_event_handler() .testing_slot_clock(HARNESS_SLOT_TIME) .expect("should configure testing slot clock") .shutdown_sender(shutdown_tx) .chain_config(chain_config) + .event_handler(Some(ServerSentEventHandler::new_with_capacity(log, 1))) .build() .expect("should build"); @@ -246,7 +240,6 @@ impl BeaconChainHarness> { .expect("should build state using recent genesis") .dummy_eth1_backend() .expect("should build dummy backend") - .null_event_handler() .testing_slot_clock(HARNESS_SLOT_TIME) .expect("should configure testing slot clock") .shutdown_sender(shutdown_tx) @@ -288,7 +281,6 @@ impl BeaconChainHarness> { .expect("should resume beacon chain from db") .dummy_eth1_backend() .expect("should build dummy backend") - .null_event_handler() .testing_slot_clock(Duration::from_secs(1)) .expect("should configure testing slot clock") .shutdown_sender(shutdown_tx) diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 443ff28de..cc9197eb9 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -15,7 +15,6 @@ network = { path = "../network" } timer = { path = "../timer" } eth2_libp2p = { path = "../eth2_libp2p" } parking_lot = "0.11.0" -websocket_server = { path = "../websocket_server" } prometheus = "0.10.0" types = { path = "../../consensus/types" } tree_hash = "0.1.1" @@ -40,7 +39,6 @@ eth2_ssz = "0.1.2" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } time = "0.2.22" -bus = "2.2.3" directory = {path = "../../common/directory"} http_api = { path = "../http_api" } http_metrics = { path = "../http_metrics" } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 0b6d2b0aa..cbffed970 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -1,36 +1,28 @@ use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use crate::Client; -use beacon_chain::events::TeeEventHandler; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::{CachingEth1Backend, Eth1Chain}, slot_clock::{SlotClock, SystemTimeSlotClock}, store::{HotColdDB, ItemStore, LevelDB, StoreConfig}, - BeaconChain, BeaconChainTypes, Eth1ChainBackend, EventHandler, + BeaconChain, BeaconChainTypes, Eth1ChainBackend, ServerSentEventHandler, }; -use bus::Bus; use environment::RuntimeContext; use eth1::{Config as Eth1Config, Service as Eth1Service}; use eth2_libp2p::NetworkGlobals; use genesis::{interop_genesis_state, Eth1GenesisService}; use network::{NetworkConfig, NetworkMessage, NetworkService}; -use parking_lot::Mutex; use slasher::{Slasher, SlasherServer}; use slog::{debug, info, warn}; use ssz::Decode; -use std::net::SocketAddr; use std::net::TcpListener; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use timer::spawn_timer; use tokio::sync::{mpsc::UnboundedSender, oneshot}; -use types::{ - test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec, - SignedBeaconBlockHash, -}; -use websocket_server::{Config as WebSocketConfig, WebSocketSender}; +use types::{test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec}; /// Interval between polling the eth1 node for genesis information. pub const ETH1_GENESIS_UPDATE_INTERVAL_MILLIS: u64 = 7_000; @@ -57,25 +49,22 @@ pub struct ClientBuilder { beacon_chain_builder: Option>, beacon_chain: Option>>, eth1_service: Option, - event_handler: Option, network_globals: Option>>, network_send: Option>>, db_path: Option, freezer_db_path: Option, http_api_config: http_api::Config, http_metrics_config: http_metrics::Config, - websocket_listen_addr: Option, slasher: Option>>, eth_spec_instance: T::EthSpec, } -impl - ClientBuilder> +impl + ClientBuilder> where TSlotClock: SlotClock + Clone + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, - TEventHandler: EventHandler + 'static, THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, { @@ -91,14 +80,12 @@ where beacon_chain_builder: None, beacon_chain: None, eth1_service: None, - event_handler: None, network_globals: None, network_send: None, db_path: None, freezer_db_path: None, http_api_config: <_>::default(), http_metrics_config: <_>::default(), - websocket_listen_addr: None, slasher: None, eth_spec_instance, } @@ -142,6 +129,11 @@ where .ok_or("beacon_chain_start_method requires a runtime context")? .service_context("beacon".into()); let spec = chain_spec.ok_or("beacon_chain_start_method requires a chain spec")?; + let event_handler = if self.http_api_config.enabled { + Some(ServerSentEventHandler::new(context.log().clone())) + } else { + None + }; let builder = BeaconChainBuilder::new(eth_spec_instance) .logger(context.log().clone()) @@ -150,7 +142,8 @@ where .custom_spec(spec.clone()) .chain_config(chain_config) .disabled_forks(disabled_forks) - .graffiti(graffiti); + .graffiti(graffiti) + .event_handler(event_handler); let builder = if let Some(slasher) = self.slasher.clone() { builder.slasher(slasher) @@ -224,14 +217,7 @@ where #[allow(clippy::type_complexity)] let ctx: Arc< http_api::Context< - Witness< - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - >, + Witness, >, > = Arc::new(http_api::Context { config: self.http_api_config.clone(), @@ -419,10 +405,8 @@ where #[allow(clippy::type_complexity)] pub fn build( self, - ) -> Result< - Client>, - String, - > { + ) -> Result>, String> + { let runtime_context = self .runtime_context .as_ref() @@ -494,18 +478,16 @@ where network_globals: self.network_globals, http_api_listen_addr, http_metrics_listen_addr, - websocket_listen_addr: self.websocket_listen_addr, }) } } -impl - ClientBuilder> +impl + ClientBuilder> where TSlotClock: SlotClock + Clone + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, - TEventHandler: EventHandler + 'static, THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, { @@ -520,10 +502,6 @@ where let chain = self .beacon_chain_builder .ok_or("beacon_chain requires a beacon_chain_builder")? - .event_handler( - self.event_handler - .ok_or("beacon_chain requires an event handler")?, - ) .slot_clock( self.slot_clock .clone() @@ -535,75 +513,18 @@ where self.beacon_chain = Some(Arc::new(chain)); self.beacon_chain_builder = None; - self.event_handler = None; // a beacon chain requires a timer self.timer() } } -impl - ClientBuilder< - Witness< - TSlotClock, - TEth1Backend, - TEthSpec, - TeeEventHandler, - THotStore, - TColdStore, - >, - > +impl + ClientBuilder, LevelDB>> where TSlotClock: SlotClock + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, - THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, -{ - #[allow(clippy::type_complexity)] - /// Specifies that the `BeaconChain` should publish events using the WebSocket server. - pub fn tee_event_handler( - mut self, - config: WebSocketConfig, - ) -> Result<(Self, Arc>>), String> { - let context = self - .runtime_context - .as_ref() - .ok_or("tee_event_handler requires a runtime_context")? - .service_context("ws".into()); - - let log = context.log().clone(); - let (sender, listening_addr): (WebSocketSender, Option<_>) = if config.enabled { - let (sender, listening_addr) = - websocket_server::start_server(context.executor, &config)?; - (sender, Some(listening_addr)) - } else { - (WebSocketSender::dummy(), None) - }; - - self.websocket_listen_addr = listening_addr; - let (tee_event_handler, bus) = TeeEventHandler::new(log, sender)?; - self.event_handler = Some(tee_event_handler); - Ok((self, bus)) - } -} - -impl - ClientBuilder< - Witness< - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - LevelDB, - LevelDB, - >, - > -where - TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, - TEthSpec: EthSpec + 'static, - TEventHandler: EventHandler + 'static, { /// Specifies that the `Client` should use a `HotColdDB` database. pub fn disk_store( @@ -632,21 +553,13 @@ where } } -impl +impl ClientBuilder< - Witness< - TSlotClock, - CachingEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - >, + Witness, TEthSpec, THotStore, TColdStore>, > where TSlotClock: SlotClock + 'static, TEthSpec: EthSpec + 'static, - TEventHandler: EventHandler + 'static, THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, { @@ -743,14 +656,11 @@ where } } -impl - ClientBuilder< - Witness, - > +impl + ClientBuilder> where TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, - TEventHandler: EventHandler + 'static, THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, { diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index de24f5b64..c4516e642 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -59,7 +59,6 @@ pub struct Config { pub store: store::StoreConfig, pub network: network::NetworkConfig, pub chain: beacon_chain::ChainConfig, - pub websocket_server: websocket_server::Config, pub eth1: eth1::Config, pub http_api: http_api::Config, pub http_metrics: http_metrics::Config, @@ -77,7 +76,6 @@ impl Default for Config { store: <_>::default(), network: NetworkConfig::default(), chain: <_>::default(), - websocket_server: <_>::default(), dummy_eth1_backend: false, sync_eth1_chain: false, eth1: <_>::default(), diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 6b721aee9..a59b02538 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -27,7 +27,6 @@ pub struct Client { http_api_listen_addr: Option, /// Listen address for the HTTP server which serves Prometheus metrics. http_metrics_listen_addr: Option, - websocket_listen_addr: Option, } impl Client { @@ -46,11 +45,6 @@ impl Client { self.http_metrics_listen_addr } - /// Returns the address of the client's WebSocket API server, if it was started. - pub fn websocket_listen_addr(&self) -> Option { - self.websocket_listen_addr - } - /// Returns the port of the client's libp2p stack, if it was started. pub fn libp2p_listen_port(&self) -> Option { self.network_globals.as_ref().map(|n| n.listen_port_tcp()) diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index d8784ea5c..29b5c6992 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] warp = { git = "https://github.com/sigp/warp ", branch = "lighthouse" } serde = { version = "1.0.116", features = ["derive"] } -tokio = { version = "0.3.2", features = ["macros"] } +tokio = { version = "0.3.2", features = ["macros","stream","sync"] } parking_lot = "0.11.0" types = { path = "../../consensus/types" } hex = "0.4.2" @@ -26,6 +26,7 @@ warp_utils = { path = "../../common/warp_utils" } slot_clock = { path = "../../common/slot_clock" } eth2_ssz = { path = "../../consensus/ssz" } bs58 = "0.3.1" +futures = "0.3.8" [dev-dependencies] store = { path = "../store" } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 159a3d2f0..5524a290c 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -17,7 +17,7 @@ use beacon_chain::{ }; use beacon_proposer_cache::BeaconProposerCache; use block_id::BlockId; -use eth2::types::{self as api_types, ValidatorId}; +use eth2::types::{self as api_types, EventKind, ValidatorId}; use eth2_libp2p::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; use lighthouse_version::version_with_platform; use network::NetworkMessage; @@ -33,6 +33,8 @@ use std::convert::TryInto; use std::future::Future; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::Arc; +use tokio::stream::{StreamExt, StreamMap}; +use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc::UnboundedSender; use types::{ Attestation, AttestationDuty, AttesterSlashing, CloneConfig, CommitteeCache, Epoch, EthSpec, @@ -40,7 +42,9 @@ use types::{ SignedBeaconBlock, SignedVoluntaryExit, Slot, YamlConfig, }; use warp::http::StatusCode; -use warp::{http::Response, Filter}; +use warp::sse::ServerSentEvent; +use warp::{http::Response, Filter, Stream}; +use warp_utils::reject::ServerSentEventError; use warp_utils::task::{blocking_json_task, blocking_task}; const API_PREFIX: &str = "eth"; @@ -1571,15 +1575,37 @@ pub fn serve( } if epoch == current_epoch { + let dependent_root_slot = current_epoch + .start_slot(T::EthSpec::slots_per_epoch()) - 1; + let dependent_root = if dependent_root_slot > chain.best_slot().map_err(warp_utils::reject::beacon_chain_error)? { + chain.head_beacon_block_root().map_err(warp_utils::reject::beacon_chain_error)? + } else { + chain + .root_at_slot(dependent_root_slot) + .map_err(warp_utils::reject::beacon_chain_error)? + .unwrap_or(chain.genesis_block_root) + }; + beacon_proposer_cache .lock() .get_proposers(&chain, epoch) - .map(api_types::GenericResponse::from) + .map(|duties| api_types::DutiesResponse{ data: duties, dependent_root} ) } else { let state = StateId::slot(epoch.start_slot(T::EthSpec::slots_per_epoch())) .state(&chain)?; + let dependent_root_slot = state.current_epoch() + .start_slot(T::EthSpec::slots_per_epoch()) - 1; + let dependent_root = if dependent_root_slot > chain.best_slot().map_err(warp_utils::reject::beacon_chain_error)? { + chain.head_beacon_block_root().map_err(warp_utils::reject::beacon_chain_error)? + } else { + chain + .root_at_slot(dependent_root_slot) + .map_err(warp_utils::reject::beacon_chain_error)? + .unwrap_or(chain.genesis_block_root) + }; + epoch .slot_iter(T::EthSpec::slots_per_epoch()) .map(|slot| { @@ -1604,7 +1630,13 @@ pub fn serve( }) }) .collect::, _>>() - .map(api_types::GenericResponse::from) + .map(|duties| { + + api_types::DutiesResponse{ + dependent_root, + data: duties, + } + }) } }) }, @@ -1781,9 +1813,9 @@ pub fn serve( // // The idea is to stop historical requests from washing out the cache on the // beacon chain, whilst allowing a VC to request duties quickly. - let duties = if epoch == current_epoch { + let (duties, dependent_root) = if epoch == current_epoch { // Fast path. - pubkeys + let duties = pubkeys .into_iter() // Exclude indices which do not represent a known public key and a // validator duty. @@ -1796,7 +1828,26 @@ pub fn serve( .map(|duty| convert(i, pubkey, duty)), ) }) - .collect::, warp::Rejection>>()? + .collect::, warp::Rejection>>()?; + + let dependent_root_slot = + (epoch - 1).start_slot(T::EthSpec::slots_per_epoch()) - 1; + let dependent_root = if dependent_root_slot + > chain + .best_slot() + .map_err(warp_utils::reject::beacon_chain_error)? + { + chain + .head_beacon_block_root() + .map_err(warp_utils::reject::beacon_chain_error)? + } else { + chain + .root_at_slot(dependent_root_slot) + .map_err(warp_utils::reject::beacon_chain_error)? + .unwrap_or(chain.genesis_block_root) + }; + + (duties, dependent_root) } else { // If the head state is equal to or earlier than the request epoch, use it. let mut state = chain @@ -1843,7 +1894,7 @@ pub fn serve( state .build_committee_cache(relative_epoch, &chain.spec) .map_err(warp_utils::reject::beacon_state_error)?; - pubkeys + let duties = pubkeys .into_iter() .filter_map(|(i, pubkey)| { Some( @@ -1854,10 +1905,32 @@ pub fn serve( .map(|duty| convert(i, pubkey, duty)), ) }) - .collect::, warp::Rejection>>()? + .collect::, warp::Rejection>>()?; + + let dependent_root_slot = + (epoch - 1).start_slot(T::EthSpec::slots_per_epoch()) - 1; + let dependent_root = if dependent_root_slot + > chain + .best_slot() + .map_err(warp_utils::reject::beacon_chain_error)? + { + chain + .head_beacon_block_root() + .map_err(warp_utils::reject::beacon_chain_error)? + } else { + chain + .root_at_slot(dependent_root_slot) + .map_err(warp_utils::reject::beacon_chain_error)? + .unwrap_or(chain.genesis_block_root) + }; + + (duties, dependent_root) }; - Ok(api_types::GenericResponse::from(duties)) + Ok(api_types::DutiesResponse { + dependent_root, + data: duties, + }) }) }, ); @@ -2190,7 +2263,7 @@ pub fn serve( let get_lighthouse_staking = warp::path("lighthouse") .and(warp::path("staking")) .and(warp::path::end()) - .and(chain_filter) + .and(chain_filter.clone()) .and_then(|chain: Arc>| { blocking_json_task(move || { if chain.eth1_chain.is_some() { @@ -2205,6 +2278,67 @@ pub fn serve( }) }); + fn merge_streams( + stream_map: StreamMap< + String, + impl Stream, RecvError>> + Unpin + Send + 'static, + >, + ) -> impl Stream> + + Send + + 'static { + // Convert messages into Server-Sent Events and return resulting stream. + stream_map.map(move |(topic_name, msg)| match msg { + Ok(data) => Ok((warp::sse::event(topic_name), warp::sse::json(data)).boxed()), + Err(e) => Err(warp_utils::reject::server_sent_event_error(format!( + "{:?}", + e + ))), + }) + } + + let get_events = eth1_v1 + .and(warp::path("events")) + .and(warp::path::end()) + .and(warp::query::()) + .and(chain_filter) + .and_then( + |topics: api_types::EventQuery, chain: Arc>| { + blocking_task(move || { + // for each topic subscribed spawn a new subscription + let mut stream_map = StreamMap::with_capacity(topics.topics.0.len()); + + if let Some(event_handler) = chain.event_handler.as_ref() { + for topic in topics.topics.0.clone() { + let receiver = match topic { + api_types::EventTopic::Head => event_handler.subscribe_head(), + api_types::EventTopic::Block => event_handler.subscribe_block(), + api_types::EventTopic::Attestation => { + event_handler.subscribe_attestation() + } + api_types::EventTopic::VoluntaryExit => { + event_handler.subscribe_exit() + } + api_types::EventTopic::FinalizedCheckpoint => { + event_handler.subscribe_finalized() + } + }; + stream_map.insert(topic.to_string(), Box::pin(receiver.into_stream())); + } + } else { + return Err(warp_utils::reject::custom_server_error( + "event handler was not initialized".to_string(), + )); + } + + let stream = merge_streams(stream_map); + + Ok::<_, warp::Rejection>(warp::sse::reply( + warp::sse::keep_alive().stream(stream), + )) + }) + }, + ); + // Define the ultimate set of routes that will be provided to the server. let routes = warp::get() .and( @@ -2253,7 +2387,8 @@ pub fn serve( .or(get_lighthouse_eth1_block_cache.boxed()) .or(get_lighthouse_eth1_deposit_cache.boxed()) .or(get_lighthouse_beacon_states_ssz.boxed()) - .or(get_lighthouse_staking.boxed()), + .or(get_lighthouse_staking.boxed()) + .or(get_events.boxed()), ) .or(warp::post().and( post_beacon_blocks diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 430a243d0..3a426b391 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -14,14 +14,17 @@ use eth2_libp2p::{ types::{EnrBitfield, SyncState}, Enr, EnrExt, NetworkGlobals, PeerId, }; +use futures::stream::{Stream, StreamExt}; use http_api::{Config, Context}; use network::NetworkMessage; use state_processing::per_slot_processing; use std::convert::TryInto; +use std::iter::Iterator; use std::net::Ipv4Addr; use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::oneshot; +use tokio::time::Duration; use tokio_compat_02::FutureExt; use tree_hash::TreeHash; use types::{ @@ -33,7 +36,7 @@ type E = MainnetEthSpec; const SLOTS_PER_EPOCH: u64 = 32; const VALIDATOR_COUNT: usize = SLOTS_PER_EPOCH as usize; -const CHAIN_LENGTH: u64 = SLOTS_PER_EPOCH * 5; +const CHAIN_LENGTH: u64 = SLOTS_PER_EPOCH * 5 - 1; // Make `next_block` an epoch transition const JUSTIFIED_EPOCH: u64 = 4; const FINALIZED_EPOCH: u64 = 3; const TCP_PORT: u16 = 42; @@ -131,7 +134,7 @@ impl ApiTester { assert_eq!( chain.head_info().unwrap().finalized_checkpoint.epoch, - 3, + 2, "precondition: finality" ); assert_eq!( @@ -140,7 +143,7 @@ impl ApiTester { .unwrap() .current_justified_checkpoint .epoch, - 4, + 3, "precondition: justification" ); @@ -218,6 +221,111 @@ impl ApiTester { } } + pub fn new_from_genesis() -> Self { + let harness = BeaconChainHarness::new( + MainnetEthSpec, + generate_deterministic_keypairs(VALIDATOR_COUNT), + ); + + harness.advance_slot(); + + let head = harness.chain.head().unwrap(); + + let (next_block, _next_state) = + harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap()); + + let attestations = harness + .get_unaggregated_attestations( + &AttestationStrategy::AllValidators, + &head.beacon_state, + head.beacon_block_root, + harness.chain.slot().unwrap(), + ) + .into_iter() + .map(|vec| vec.into_iter().map(|(attestation, _subnet_id)| attestation)) + .flatten() + .collect::>(); + + let attester_slashing = harness.make_attester_slashing(vec![0, 1]); + let proposer_slashing = harness.make_proposer_slashing(2); + let voluntary_exit = harness.make_voluntary_exit(3, harness.chain.epoch().unwrap()); + + let chain = Arc::new(harness.chain); + + let (network_tx, network_rx) = mpsc::unbounded_channel(); + + let log = null_logger().unwrap(); + + // Default metadata + let meta_data = MetaData { + seq_number: SEQ_NUMBER, + attnets: EnrBitfield::::default(), + }; + let enr_key = CombinedKey::generate_secp256k1(); + let enr = EnrBuilder::new("v4").build(&enr_key).unwrap(); + let enr_clone = enr.clone(); + let network_globals = NetworkGlobals::new(enr, TCP_PORT, UDP_PORT, meta_data, vec![], &log); + + let peer_id = PeerId::random(); + network_globals.peers.write().connect_ingoing( + &peer_id, + EXTERNAL_ADDR.parse().unwrap(), + None, + ); + + *network_globals.sync_state.write() = SyncState::Synced; + + let eth1_service = + eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()); + + let context = Arc::new(Context { + config: Config { + enabled: true, + listen_addr: Ipv4Addr::new(127, 0, 0, 1), + listen_port: 0, + allow_origin: None, + }, + chain: Some(chain.clone()), + network_tx: Some(network_tx), + network_globals: Some(Arc::new(network_globals)), + eth1_service: Some(eth1_service), + log, + }); + let ctx = context.clone(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let server_shutdown = async { + // It's not really interesting why this triggered, just that it happened. + let _ = shutdown_rx.await; + }; + let (listening_socket, server) = http_api::serve(ctx, server_shutdown).unwrap(); + + tokio::spawn(async { server.await }); + + let client = BeaconNodeHttpClient::new( + Url::parse(&format!( + "http://{}:{}", + listening_socket.ip(), + listening_socket.port() + )) + .unwrap(), + ); + + Self { + chain, + client, + next_block, + attestations, + attester_slashing, + proposer_slashing, + voluntary_exit, + _server_shutdown: shutdown_tx, + validator_keypairs: harness.validator_keypairs, + network_rx, + local_enr: enr_clone, + external_peer_id: peer_id, + } + } + fn skip_slots(self, count: u64) -> Self { for _ in 0..count { self.chain @@ -1376,8 +1484,17 @@ impl ApiTester { .client .post_validator_duties_attester(epoch, indices.as_slice()) .await + .unwrap(); + + let dependent_root = self + .chain + .root_at_slot((epoch - 1).start_slot(E::slots_per_epoch()) - 1) .unwrap() - .data; + .unwrap_or(self.chain.head_beacon_block_root().unwrap()); + + assert_eq!(results.dependent_root, dependent_root); + + let result_duties = results.data; let mut state = self .chain @@ -1395,7 +1512,7 @@ impl ApiTester { .filter(|i| **i < state.validators.len() as u64) .count(); - assert_eq!(results.len(), expected_len); + assert_eq!(result_duties.len(), expected_len); for (indices_set, &i) in indices.iter().enumerate() { if let Some(duty) = state @@ -1412,7 +1529,7 @@ impl ApiTester { slot: duty.slot, }; - let result = results + let result = result_duties .iter() .find(|duty| duty.validator_index == i) .unwrap(); @@ -1424,7 +1541,7 @@ impl ApiTester { ); } else { assert!( - !results.iter().any(|duty| duty.validator_index == i), + !result_duties.iter().any(|duty| duty.validator_index == i), "validator index should not exist in response" ); } @@ -1438,12 +1555,17 @@ impl ApiTester { pub async fn test_get_validator_duties_proposer(self) -> Self { let current_epoch = self.chain.epoch().unwrap(); + let dependent_root = self + .chain + .root_at_slot(current_epoch.start_slot(E::slots_per_epoch()) - 1) + .unwrap() + .unwrap_or(self.chain.head_beacon_block_root().unwrap()); + let result = self .client .get_validator_duties_proposer(current_epoch) .await - .unwrap() - .data; + .unwrap(); let mut state = self.chain.head_beacon_state().unwrap(); @@ -1455,7 +1577,7 @@ impl ApiTester { .build_committee_cache(RelativeEpoch::Current, &self.chain.spec) .unwrap(); - let expected = current_epoch + let expected_duties = current_epoch .slot_iter(E::slots_per_epoch()) .map(|slot| { let index = state @@ -1471,6 +1593,11 @@ impl ApiTester { }) .collect::>(); + let expected = DutiesResponse { + data: expected_duties, + dependent_root, + }; + assert_eq!(result, expected); self @@ -1824,6 +1951,185 @@ impl ApiTester { self } + + pub async fn test_get_events(self) -> Self { + // Subscribe to all events + let topics = vec![ + EventTopic::Attestation, + EventTopic::VoluntaryExit, + EventTopic::Block, + EventTopic::Head, + EventTopic::FinalizedCheckpoint, + ]; + let mut events_future = self + .client + .get_events::(topics.as_slice()) + .await + .unwrap(); + + let expected_attestation_len = self.attestations.len(); + + self.client + .post_beacon_pool_attestations(self.attestations.as_slice()) + .await + .unwrap(); + + let attestation_events = poll_events( + &mut events_future, + expected_attestation_len, + Duration::from_millis(10000), + ) + .await; + assert_eq!( + attestation_events.as_slice(), + self.attestations + .clone() + .into_iter() + .map(|attestation| EventKind::Attestation(attestation)) + .collect::>() + .as_slice() + ); + + // Produce a voluntary exit event + self.client + .post_beacon_pool_voluntary_exits(&self.voluntary_exit) + .await + .unwrap(); + + let exit_events = poll_events(&mut events_future, 1, Duration::from_millis(10000)).await; + assert_eq!( + exit_events.as_slice(), + &[EventKind::VoluntaryExit(self.voluntary_exit.clone())] + ); + + // Submit the next block, which is on an epoch boundary, so this will produce a finalized + // checkpoint event, head event, and block event + let block_root = self.next_block.canonical_root(); + + // current_duty_dependent_root = block root because this is the first slot of the epoch + let current_duty_dependent_root = self.chain.head_beacon_block_root().unwrap(); + let current_slot = self.chain.slot().unwrap(); + let next_slot = self.next_block.slot(); + let finalization_distance = E::slots_per_epoch() * 2; + + let expected_block = EventKind::Block(SseBlock { + block: block_root, + slot: next_slot, + }); + + let expected_head = EventKind::Head(SseHead { + block: block_root, + slot: next_slot, + state: self.next_block.state_root(), + current_duty_dependent_root, + previous_duty_dependent_root: self + .chain + .root_at_slot(current_slot - E::slots_per_epoch()) + .unwrap() + .unwrap(), + epoch_transition: true, + }); + + let expected_finalized = EventKind::FinalizedCheckpoint(SseFinalizedCheckpoint { + block: self + .chain + .root_at_slot(next_slot - finalization_distance) + .unwrap() + .unwrap(), + state: self + .chain + .state_root_at_slot(next_slot - finalization_distance) + .unwrap() + .unwrap(), + epoch: Epoch::new(3), + }); + + self.client + .post_beacon_blocks(&self.next_block) + .await + .unwrap(); + + let block_events = poll_events(&mut events_future, 3, Duration::from_millis(10000)).await; + assert_eq!( + block_events.as_slice(), + &[expected_block, expected_finalized, expected_head] + ); + + self + } + + pub async fn test_get_events_from_genesis(self) -> Self { + let topics = vec![EventTopic::Block, EventTopic::Head]; + let mut events_future = self + .client + .get_events::(topics.as_slice()) + .await + .unwrap(); + + let block_root = self.next_block.canonical_root(); + let next_slot = self.next_block.slot(); + + let expected_block = EventKind::Block(SseBlock { + block: block_root, + slot: next_slot, + }); + + let expected_head = EventKind::Head(SseHead { + block: block_root, + slot: next_slot, + state: self.next_block.state_root(), + current_duty_dependent_root: self.chain.genesis_block_root, + previous_duty_dependent_root: self.chain.genesis_block_root, + epoch_transition: false, + }); + + self.client + .post_beacon_blocks(&self.next_block) + .await + .unwrap(); + + let block_events = poll_events(&mut events_future, 2, Duration::from_millis(10000)).await; + assert_eq!(block_events.as_slice(), &[expected_block, expected_head]); + + self + } +} + +async fn poll_events, eth2::Error>> + Unpin, T: EthSpec>( + stream: &mut S, + num_events: usize, + timeout: Duration, +) -> Vec> { + let mut events = Vec::new(); + + let collect_stream_fut = async { + loop { + if let Some(result) = stream.next().await { + events.push(result.unwrap()); + if events.len() == num_events { + return; + } + } + } + }; + + tokio::select! { + _ = collect_stream_fut => {return events} + _ = tokio::time::sleep(timeout) => { return events; } + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_events() { + ApiTester::new().test_get_events().compat().await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_events_from_genesis() { + ApiTester::new_from_genesis() + .test_get_events_from_genesis() + .compat() + .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index e0d6da5d5..e4173cbdb 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -4,7 +4,6 @@ mod tests { use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::CachingEth1Backend, - events::NullEventHandler, }; use futures::Stream; use genesis::{generate_deterministic_keypairs, interop_genesis_state}; @@ -25,7 +24,6 @@ mod tests { SystemTimeSlotClock, CachingEth1Backend, MinimalEthSpec, - NullEventHandler, MemoryStore, MemoryStore, >; @@ -61,7 +59,6 @@ mod tests { .expect("should build state using recent genesis") .dummy_eth1_backend() .expect("should build dummy backend") - .null_event_handler() .slot_clock(SystemTimeSlotClock::new( Slot::new(0), Duration::from_secs(recent_genesis_time()), diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 5c1c61323..2594791af 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1,4 +1,5 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; + use beacon_chain::{ attestation_verification::Error as AttnError, observed_operations::ObservationOutcome, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, @@ -371,6 +372,7 @@ impl Worker { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); self.chain.import_voluntary_exit(exit); + debug!(self.log, "Successfully imported voluntary exit"); metrics::inc_counter(&metrics::BEACON_PROCESSOR_EXIT_IMPORTED_TOTAL); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 492c8ceb6..c5d2cf4c6 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -232,29 +232,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { address of this server (e.g., http://localhost:5054).") .takes_value(true), ) - /* Websocket related arguments */ - .arg( - Arg::with_name("ws") - .long("ws") - .help("Enable the websocket server. Disabled by default.") - .takes_value(false), - ) - .arg( - Arg::with_name("ws-address") - .long("ws-address") - .value_name("ADDRESS") - .help("Set the listen address for the websocket server.") - .default_value("127.0.0.1") - .takes_value(true), - ) - .arg( - Arg::with_name("ws-port") - .long("ws-port") - .value_name("PORT") - .help("Set the listen TCP port for the websocket server.") - .default_value("5053") - .takes_value(true), - ) /* * Standard staking flags diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 485068c50..926ff54cc 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -150,26 +150,6 @@ pub fn get_config( ); } - /* - * Websocket server - */ - - if cli_args.is_present("ws") { - client_config.websocket_server.enabled = true; - } - - if let Some(address) = cli_args.value_of("ws-address") { - client_config.websocket_server.listen_address = address - .parse::() - .map_err(|_| "ws-address is not a valid IPv4 address.")?; - } - - if let Some(port) = cli_args.value_of("ws-port") { - client_config.websocket_server.port = port - .parse::() - .map_err(|_| "ws-port is not a valid u16.")?; - } - /* * Eth1 */ @@ -250,7 +230,6 @@ pub fn get_config( unused_port("udp").map_err(|e| format!("Failed to get port for discovery: {}", e))?; client_config.http_api.listen_port = 0; client_config.http_metrics.listen_port = 0; - client_config.websocket_server.port = 0; } /* diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index d821f7cf3..a5dc2f07b 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -5,18 +5,16 @@ mod cli; mod config; pub use beacon_chain; -pub use cli::cli_app; -pub use client::{Client, ClientBuilder, ClientConfig, ClientGenesis}; -pub use config::{get_config, get_data_dir, get_eth2_testnet_config, set_network_config}; -pub use eth2_config::Eth2Config; - -use beacon_chain::events::TeeEventHandler; use beacon_chain::store::LevelDB; use beacon_chain::{ builder::Witness, eth1_chain::CachingEth1Backend, slot_clock::SystemTimeSlotClock, }; use clap::ArgMatches; +pub use cli::cli_app; +pub use client::{Client, ClientBuilder, ClientConfig, ClientGenesis}; +pub use config::{get_config, get_data_dir, get_eth2_testnet_config, set_network_config}; use environment::RuntimeContext; +pub use eth2_config::Eth2Config; use slasher::Slasher; use slog::{info, warn}; use std::ops::{Deref, DerefMut}; @@ -24,16 +22,8 @@ use std::sync::Arc; use types::EthSpec; /// A type-alias to the tighten the definition of a production-intended `Client`. -pub type ProductionClient = Client< - Witness< - SystemTimeSlotClock, - CachingEth1Backend, - E, - TeeEventHandler, - LevelDB, - LevelDB, - >, ->; +pub type ProductionClient = + Client, E, LevelDB, LevelDB>>; /// The beacon node `Client` that will be used in production. /// @@ -121,9 +111,7 @@ impl ProductionBeaconNode { builder.no_eth1_backend()? }; - let (builder, _events) = builder - .system_time_slot_clock()? - .tee_event_handler(client_config.websocket_server.clone())?; + let builder = builder.system_time_slot_clock()?; // Inject the executor into the discv5 network config. let discv5_executor = Discv5Executor(executor); diff --git a/beacon_node/websocket_server/Cargo.toml b/beacon_node/websocket_server/Cargo.toml index c39c785b9..e69de29bb 100644 --- a/beacon_node/websocket_server/Cargo.toml +++ b/beacon_node/websocket_server/Cargo.toml @@ -1,17 +0,0 @@ -[package] -name = "websocket_server" -version = "0.2.0" -authors = ["Paul Hauner "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -futures = "0.3.7" -serde = "1.0.116" -serde_derive = "1.0.116" -slog = "2.5.2" -tokio = { version = "0.3.2", features = ["full"] } -types = { path = "../../consensus/types" } -ws = "0.9.1" -task_executor = { path = "../../common/task_executor" } diff --git a/beacon_node/websocket_server/src/config.rs b/beacon_node/websocket_server/src/config.rs deleted file mode 100644 index 8b693136c..000000000 --- a/beacon_node/websocket_server/src/config.rs +++ /dev/null @@ -1,22 +0,0 @@ -use serde_derive::{Deserialize, Serialize}; -use std::net::Ipv4Addr; - -/// The core configuration of a Lighthouse beacon node. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Config { - pub enabled: bool, - /// The IPv4 address the REST API HTTP server will listen on. - pub listen_address: Ipv4Addr, - /// The port the REST API HTTP server will listen on. - pub port: u16, -} - -impl Default for Config { - fn default() -> Self { - Config { - enabled: false, - listen_address: Ipv4Addr::new(127, 0, 0, 1), - port: 5053, - } - } -} diff --git a/beacon_node/websocket_server/src/lib.rs b/beacon_node/websocket_server/src/lib.rs index 82bbd2619..e69de29bb 100644 --- a/beacon_node/websocket_server/src/lib.rs +++ b/beacon_node/websocket_server/src/lib.rs @@ -1,118 +0,0 @@ -use slog::{debug, error, info, warn}; -use std::marker::PhantomData; -use std::net::SocketAddr; -use types::EthSpec; -use ws::{Sender, WebSocket}; - -mod config; - -pub use config::Config; - -pub struct WebSocketSender { - sender: Option, - _phantom: PhantomData, -} - -impl WebSocketSender { - /// Creates a dummy websocket server that never starts and where all future calls are no-ops. - pub fn dummy() -> Self { - Self { - sender: None, - _phantom: PhantomData, - } - } - - pub fn send_string(&self, string: String) -> Result<(), String> { - if let Some(sender) = &self.sender { - sender - .send(string) - .map_err(|e| format!("Unable to broadcast to websocket clients: {:?}", e)) - } else { - Ok(()) - } - } -} - -pub fn start_server( - executor: task_executor::TaskExecutor, - config: &Config, -) -> Result<(WebSocketSender, SocketAddr), String> { - let log = executor.log(); - let server_string = format!("{}:{}", config.listen_address, config.port); - - // Create a server that simply ignores any incoming messages. - let server = WebSocket::new(|_| |_| Ok(())) - .map_err(|e| format!("Failed to initialize websocket server: {:?}", e))? - .bind(server_string.clone()) - .map_err(|e| { - format!( - "Failed to bind websocket server to {}: {:?}", - server_string, e - ) - })?; - - let actual_listen_addr = server.local_addr().map_err(|e| { - format!( - "Failed to read listening addr from websocket server: {:?}", - e - ) - })?; - - let broadcaster = server.broadcaster(); - - // Produce a signal/channel that can gracefully shutdown the websocket server. - let exit = executor.exit(); - let log_inner = log.clone(); - let broadcaster_inner = server.broadcaster(); - let exit_future = async move { - let _ = exit.await; - if let Err(e) = broadcaster_inner.shutdown() { - warn!( - log_inner, - "Websocket server errored on shutdown"; - "error" => format!("{:?}", e) - ); - } else { - info!(log_inner, "Websocket server shutdown"); - } - }; - - // Place a future on the handle that will shutdown the websocket server when the - // application exits. - - executor.spawn(exit_future, "Websocket exit"); - - let log_inner = log.clone(); - let server_future = move || match server.run() { - Ok(_) => { - debug!( - log_inner, - "Websocket server thread stopped"; - ); - } - Err(e) => { - error!( - log_inner, - "Websocket server failed to start"; - "error" => format!("{:?}", e) - ); - } - }; - - executor.spawn_blocking(server_future, "Websocket server"); - - info!( - log, - "WebSocket server started"; - "address" => format!("{}", actual_listen_addr.ip()), - "port" => actual_listen_addr.port(), - ); - - Ok(( - WebSocketSender { - sender: Some(broadcaster), - _phantom: PhantomData, - }, - actual_listen_addr, - )) -} diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index 9bd37a9b1..547150353 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -11,7 +11,7 @@ serde = { version = "1.0.116", features = ["derive"] } serde_json = "1.0.58" types = { path = "../../consensus/types" } hex = "0.4.2" -reqwest = { version = "0.10.8", features = ["json"] } +reqwest = { version = "0.10.8", features = ["json","stream"] } eth2_libp2p = { path = "../../beacon_node/eth2_libp2p" } proto_array = { path = "../../consensus/proto_array", optional = true } serde_utils = { path = "../../consensus/serde_utils" } @@ -23,6 +23,8 @@ bytes = "0.5.6" account_utils = { path = "../../common/account_utils" } eth2_ssz = "0.1.2" eth2_ssz_derive = "0.1.0" +futures-util = "0.3.8" +futures = "0.3.8" [target.'cfg(target_os = "linux")'.dependencies] psutil = { version = "3.2.0", optional = true } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index c6191b7ff..d5104b079 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -13,14 +13,16 @@ pub mod lighthouse_vc; pub mod types; use self::types::*; +use eth2_libp2p::PeerId; +use futures::Stream; +use futures_util::StreamExt; +pub use reqwest; use reqwest::{IntoUrl, Response}; +pub use reqwest::{StatusCode, Url}; use serde::{de::DeserializeOwned, Serialize}; use std::convert::TryFrom; use std::fmt; - -use eth2_libp2p::PeerId; -pub use reqwest; -pub use reqwest::{StatusCode, Url}; +use std::iter::Iterator; #[derive(Debug)] pub enum Error { @@ -42,6 +44,8 @@ pub enum Error { MissingSignatureHeader, /// The server returned an invalid JSON response. InvalidJson(serde_json::Error), + /// The server returned an invalid server-sent event. + InvalidServerSentEvent(String), /// The server returned an invalid SSZ response. InvalidSsz(ssz::DecodeError), } @@ -59,6 +63,7 @@ impl Error { Error::InvalidSignatureHeader => None, Error::MissingSignatureHeader => None, Error::InvalidJson(_) => None, + Error::InvalidServerSentEvent(_) => None, Error::InvalidSsz(_) => None, } } @@ -826,7 +831,7 @@ impl BeaconNodeHttpClient { pub async fn get_validator_duties_proposer( &self, epoch: Epoch, - ) -> Result>, Error> { + ) -> Result>, Error> { let mut path = self.eth_path()?; path.path_segments_mut() @@ -913,7 +918,7 @@ impl BeaconNodeHttpClient { &self, epoch: Epoch, indices: &[u64], - ) -> Result>, Error> { + ) -> Result>, Error> { let mut path = self.eth_path()?; path.path_segments_mut() @@ -966,6 +971,36 @@ impl BeaconNodeHttpClient { Ok(()) } + + /// `GET events?topics` + pub async fn get_events( + &self, + topic: &[EventTopic], + ) -> Result, Error>>, Error> { + let mut path = self.eth_path()?; + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("events"); + + let topic_string = topic + .iter() + .map(|i| i.to_string()) + .collect::>() + .join(","); + path.query_pairs_mut().append_pair("topics", &topic_string); + + Ok(self + .client + .get(path) + .send() + .await + .map_err(Error::Reqwest)? + .bytes_stream() + .map(|next| match next { + Ok(bytes) => EventKind::from_sse_bytes(bytes.as_ref()), + Err(e) => Err(Error::Reqwest(e)), + })) + } } /// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 0bed06f69..b37e3de43 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1,12 +1,12 @@ //! This module exposes a superset of the `types` crate. It adds additional types that are only //! required for the HTTP API. +use crate::Error as ServerError; use eth2_libp2p::{ConnectionDirection, Enr, Multiaddr, PeerConnectionStatus}; use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use std::fmt; -use std::str::FromStr; - +use std::str::{from_utf8, FromStr}; pub use types::*; /// An API error serializable to JSON. @@ -147,6 +147,13 @@ impl fmt::Display for StateId { } } +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +#[serde(bound = "T: Serialize + serde::de::DeserializeOwned")] +pub struct DutiesResponse { + pub dependent_root: Hash256, + pub data: T, +} + #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] #[serde(bound = "T: Serialize + serde::de::DeserializeOwned")] pub struct GenericResponse { @@ -638,6 +645,129 @@ pub struct PeerCount { pub disconnecting: u64, } +// --------- Server Sent Event Types ----------- + +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SseBlock { + pub slot: Slot, + pub block: Hash256, +} + +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SseFinalizedCheckpoint { + pub block: Hash256, + pub state: Hash256, + pub epoch: Epoch, +} + +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SseHead { + pub slot: Slot, + pub block: Hash256, + pub state: Hash256, + pub current_duty_dependent_root: Hash256, + pub previous_duty_dependent_root: Hash256, + pub epoch_transition: bool, +} + +#[derive(PartialEq, Debug, Serialize, Clone)] +#[serde(bound = "T: EthSpec", untagged)] +pub enum EventKind { + Attestation(Attestation), + Block(SseBlock), + FinalizedCheckpoint(SseFinalizedCheckpoint), + Head(SseHead), + VoluntaryExit(SignedVoluntaryExit), +} + +impl EventKind { + pub fn from_sse_bytes(message: &[u8]) -> Result { + let s = from_utf8(message) + .map_err(|e| ServerError::InvalidServerSentEvent(format!("{:?}", e)))?; + + let mut split = s.split('\n'); + let event = split + .next() + .ok_or_else(|| { + ServerError::InvalidServerSentEvent("Could not parse event tag".to_string()) + })? + .trim_start_matches("event:"); + let data = split + .next() + .ok_or_else(|| { + ServerError::InvalidServerSentEvent("Could not parse data tag".to_string()) + })? + .trim_start_matches("data:"); + + match event { + "attestation" => Ok(EventKind::Attestation(serde_json::from_str(data).map_err( + |e| ServerError::InvalidServerSentEvent(format!("Attestation: {:?}", e)), + )?)), + "block" => Ok(EventKind::Block(serde_json::from_str(data).map_err( + |e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)), + )?)), + "finalized_checkpoint" => Ok(EventKind::FinalizedCheckpoint( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Finalized Checkpoint: {:?}", e)) + })?, + )), + "head" => Ok(EventKind::Head(serde_json::from_str(data).map_err( + |e| ServerError::InvalidServerSentEvent(format!("Head: {:?}", e)), + )?)), + "voluntary_exit" => Ok(EventKind::VoluntaryExit( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Voluntary Exit: {:?}", e)) + })?, + )), + _ => Err(ServerError::InvalidServerSentEvent( + "Could not parse event tag".to_string(), + )), + } + } +} + +#[derive(Clone, Deserialize)] +pub struct EventQuery { + pub topics: QueryVec, +} + +#[derive(Debug, Clone, Copy, PartialEq, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum EventTopic { + Head, + Block, + Attestation, + VoluntaryExit, + FinalizedCheckpoint, +} + +impl FromStr for EventTopic { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "head" => Ok(EventTopic::Head), + "block" => Ok(EventTopic::Block), + "attestation" => Ok(EventTopic::Attestation), + "voluntary_exit" => Ok(EventTopic::VoluntaryExit), + "finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint), + _ => Err("event topic cannot be parsed.".to_string()), + } + } +} + +impl fmt::Display for EventTopic { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + EventTopic::Head => write!(f, "head"), + EventTopic::Block => write!(f, "block"), + EventTopic::Attestation => write!(f, "attestation"), + EventTopic::VoluntaryExit => write!(f, "voluntary_exit"), + EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/common/warp_utils/src/reject.rs b/common/warp_utils/src/reject.rs index 9a5a8ea5c..f5ce1156e 100644 --- a/common/warp_utils/src/reject.rs +++ b/common/warp_utils/src/reject.rs @@ -1,7 +1,24 @@ use eth2::types::{ErrorMessage, Failure, IndexedErrorMessage}; use std::convert::Infallible; +use std::error::Error; +use std::fmt; use warp::{http::StatusCode, reject::Reject}; +#[derive(Debug)] +pub struct ServerSentEventError(pub String); + +impl Error for ServerSentEventError {} + +impl fmt::Display for ServerSentEventError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +pub fn server_sent_event_error(s: String) -> ServerSentEventError { + ServerSentEventError(s) +} + #[derive(Debug)] pub struct BeaconChainError(pub beacon_chain::BeaconChainError); diff --git a/testing/node_test_rig/src/lib.rs b/testing/node_test_rig/src/lib.rs index 20e9c8ba9..c584e5e36 100644 --- a/testing/node_test_rig/src/lib.rs +++ b/testing/node_test_rig/src/lib.rs @@ -89,8 +89,6 @@ pub fn testing_client_config() -> ClientConfig { client_config.network.upnp_enabled = false; client_config.http_api.enabled = true; client_config.http_api.listen_port = 0; - client_config.websocket_server.enabled = true; - client_config.websocket_server.port = 0; client_config.dummy_eth1_backend = true;