Adds beacon chain events, websocket event handler
This commit is contained in:
parent
110e627d7b
commit
07990e0e92
@ -1,6 +1,7 @@
|
|||||||
use crate::checkpoint::CheckPoint;
|
use crate::checkpoint::CheckPoint;
|
||||||
use crate::errors::{BeaconChainError as Error, BlockProductionError};
|
use crate::errors::{BeaconChainError as Error, BlockProductionError};
|
||||||
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
|
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
|
||||||
|
use crate::events::{EventHandler, EventKind};
|
||||||
use crate::fork_choice::{Error as ForkChoiceError, ForkChoice};
|
use crate::fork_choice::{Error as ForkChoiceError, ForkChoice};
|
||||||
use crate::iter::{ReverseBlockRootIterator, ReverseStateRootIterator};
|
use crate::iter::{ReverseBlockRootIterator, ReverseStateRootIterator};
|
||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
@ -95,6 +96,7 @@ pub trait BeaconChainTypes: Send + Sync + 'static {
|
|||||||
type LmdGhost: LmdGhost<Self::Store, Self::EthSpec>;
|
type LmdGhost: LmdGhost<Self::Store, Self::EthSpec>;
|
||||||
type Eth1Chain: Eth1ChainBackend<Self::EthSpec>;
|
type Eth1Chain: Eth1ChainBackend<Self::EthSpec>;
|
||||||
type EthSpec: types::EthSpec;
|
type EthSpec: types::EthSpec;
|
||||||
|
type EventHandler: EventHandler<Self::EthSpec>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block
|
/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block
|
||||||
@ -117,6 +119,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
|
|||||||
/// A state-machine that is updated with information from the network and chooses a canonical
|
/// A state-machine that is updated with information from the network and chooses a canonical
|
||||||
/// head block.
|
/// head block.
|
||||||
pub fork_choice: ForkChoice<T>,
|
pub fork_choice: ForkChoice<T>,
|
||||||
|
/// A handler for events generated by the beacon chain.
|
||||||
|
pub event_handler: T::EventHandler,
|
||||||
/// Logging to CLI, etc.
|
/// Logging to CLI, etc.
|
||||||
log: Logger,
|
log: Logger,
|
||||||
}
|
}
|
||||||
@ -126,6 +130,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
pub fn from_genesis(
|
pub fn from_genesis(
|
||||||
store: Arc<T::Store>,
|
store: Arc<T::Store>,
|
||||||
eth1_backend: T::Eth1Chain,
|
eth1_backend: T::Eth1Chain,
|
||||||
|
event_handler: T::EventHandler,
|
||||||
mut genesis_state: BeaconState<T::EthSpec>,
|
mut genesis_state: BeaconState<T::EthSpec>,
|
||||||
mut genesis_block: BeaconBlock<T::EthSpec>,
|
mut genesis_block: BeaconBlock<T::EthSpec>,
|
||||||
spec: ChainSpec,
|
spec: ChainSpec,
|
||||||
@ -174,6 +179,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
canonical_head,
|
canonical_head,
|
||||||
genesis_block_root,
|
genesis_block_root,
|
||||||
fork_choice: ForkChoice::new(store.clone(), &genesis_block, genesis_block_root),
|
fork_choice: ForkChoice::new(store.clone(), &genesis_block, genesis_block_root),
|
||||||
|
event_handler,
|
||||||
store,
|
store,
|
||||||
log,
|
log,
|
||||||
})
|
})
|
||||||
@ -183,6 +189,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
pub fn from_store(
|
pub fn from_store(
|
||||||
store: Arc<T::Store>,
|
store: Arc<T::Store>,
|
||||||
eth1_backend: T::Eth1Chain,
|
eth1_backend: T::Eth1Chain,
|
||||||
|
event_handler: T::EventHandler,
|
||||||
spec: ChainSpec,
|
spec: ChainSpec,
|
||||||
log: Logger,
|
log: Logger,
|
||||||
) -> Result<Option<BeaconChain<T>>, Error> {
|
) -> Result<Option<BeaconChain<T>>, Error> {
|
||||||
@ -219,6 +226,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
slot_clock,
|
slot_clock,
|
||||||
fork_choice: ForkChoice::new(store.clone(), last_finalized_block, last_finalized_root),
|
fork_choice: ForkChoice::new(store.clone(), last_finalized_block, last_finalized_root),
|
||||||
op_pool,
|
op_pool,
|
||||||
|
event_handler,
|
||||||
eth1_chain: Eth1Chain::new(eth1_backend),
|
eth1_chain: Eth1Chain::new(eth1_backend),
|
||||||
canonical_head: RwLock::new(p.canonical_head),
|
canonical_head: RwLock::new(p.canonical_head),
|
||||||
genesis_block_root: p.genesis_block_root,
|
genesis_block_root: p.genesis_block_root,
|
||||||
@ -629,6 +637,59 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
pub fn process_attestation(
|
pub fn process_attestation(
|
||||||
&self,
|
&self,
|
||||||
attestation: Attestation<T::EthSpec>,
|
attestation: Attestation<T::EthSpec>,
|
||||||
|
) -> Result<AttestationProcessingOutcome, Error> {
|
||||||
|
let outcome = self.process_attestation_internal(attestation.clone());
|
||||||
|
|
||||||
|
match &outcome {
|
||||||
|
Ok(outcome) => match outcome {
|
||||||
|
AttestationProcessingOutcome::Processed => {
|
||||||
|
trace!(
|
||||||
|
self.log,
|
||||||
|
"Beacon attestation imported";
|
||||||
|
"shard" => attestation.data.crosslink.shard,
|
||||||
|
"target_epoch" => attestation.data.target.epoch,
|
||||||
|
);
|
||||||
|
let _ = self
|
||||||
|
.event_handler
|
||||||
|
.register(EventKind::BeaconAttestationImported {
|
||||||
|
attestation: Box::new(attestation),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
other => {
|
||||||
|
warn!(
|
||||||
|
self.log,
|
||||||
|
"Beacon attestation rejected";
|
||||||
|
"reason" => format!("{:?}", other),
|
||||||
|
);
|
||||||
|
let _ = self
|
||||||
|
.event_handler
|
||||||
|
.register(EventKind::BeaconAttestationRejected {
|
||||||
|
reason: format!("Invalid attestation: {:?}", other),
|
||||||
|
attestation: Box::new(attestation),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
self.log,
|
||||||
|
"Beacon attestation processing error";
|
||||||
|
"error" => format!("{:?}", e),
|
||||||
|
);
|
||||||
|
let _ = self
|
||||||
|
.event_handler
|
||||||
|
.register(EventKind::BeaconAttestationRejected {
|
||||||
|
reason: format!("Internal error: {:?}", e),
|
||||||
|
attestation: Box::new(attestation),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
outcome
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn process_attestation_internal(
|
||||||
|
&self,
|
||||||
|
attestation: Attestation<T::EthSpec>,
|
||||||
) -> Result<AttestationProcessingOutcome, Error> {
|
) -> Result<AttestationProcessingOutcome, Error> {
|
||||||
metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_REQUESTS);
|
metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_REQUESTS);
|
||||||
let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_TIMES);
|
let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_TIMES);
|
||||||
@ -932,6 +993,57 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
pub fn process_block(
|
pub fn process_block(
|
||||||
&self,
|
&self,
|
||||||
block: BeaconBlock<T::EthSpec>,
|
block: BeaconBlock<T::EthSpec>,
|
||||||
|
) -> Result<BlockProcessingOutcome, Error> {
|
||||||
|
let outcome = self.process_block_internal(block.clone());
|
||||||
|
|
||||||
|
match &outcome {
|
||||||
|
Ok(outcome) => match outcome {
|
||||||
|
BlockProcessingOutcome::Processed { block_root } => {
|
||||||
|
trace!(
|
||||||
|
self.log,
|
||||||
|
"Beacon block imported";
|
||||||
|
"block_root" => format!("{:?}", block_root),
|
||||||
|
"block_slot" => format!("{:?}", block_root),
|
||||||
|
);
|
||||||
|
let _ = self.event_handler.register(EventKind::BeaconBlockImported {
|
||||||
|
block_root: *block_root,
|
||||||
|
block: Box::new(block),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
other => {
|
||||||
|
warn!(
|
||||||
|
self.log,
|
||||||
|
"Beacon block rejected";
|
||||||
|
"reason" => format!("{:?}", other),
|
||||||
|
);
|
||||||
|
let _ = self.event_handler.register(EventKind::BeaconBlockRejected {
|
||||||
|
reason: format!("Invalid block: {:?}", other),
|
||||||
|
block: Box::new(block),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
self.log,
|
||||||
|
"Beacon block processing error";
|
||||||
|
"error" => format!("{:?}", e),
|
||||||
|
);
|
||||||
|
let _ = self.event_handler.register(EventKind::BeaconBlockRejected {
|
||||||
|
reason: format!("Internal error: {:?}", e),
|
||||||
|
block: Box::new(block),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
outcome
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Accept some block and attempt to add it to block DAG.
|
||||||
|
///
|
||||||
|
/// Will accept blocks from prior slots, however it will reject any block from a future slot.
|
||||||
|
fn process_block_internal(
|
||||||
|
&self,
|
||||||
|
block: BeaconBlock<T::EthSpec>,
|
||||||
) -> Result<BlockProcessingOutcome, Error> {
|
) -> Result<BlockProcessingOutcome, Error> {
|
||||||
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);
|
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);
|
||||||
let full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
|
let full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
|
||||||
|
@ -131,10 +131,11 @@ impl<T: BeaconChainTypes> BeaconChainBuilder<T> {
|
|||||||
self,
|
self,
|
||||||
store: Arc<T::Store>,
|
store: Arc<T::Store>,
|
||||||
eth1_backend: T::Eth1Chain,
|
eth1_backend: T::Eth1Chain,
|
||||||
|
event_handler: T::EventHandler,
|
||||||
) -> Result<BeaconChain<T>, String> {
|
) -> Result<BeaconChain<T>, String> {
|
||||||
Ok(match self.build_strategy {
|
Ok(match self.build_strategy {
|
||||||
BuildStrategy::LoadFromStore => {
|
BuildStrategy::LoadFromStore => {
|
||||||
BeaconChain::from_store(store, eth1_backend, self.spec, self.log)
|
BeaconChain::from_store(store, eth1_backend, event_handler, self.spec, self.log)
|
||||||
.map_err(|e| format!("Error loading BeaconChain from database: {:?}", e))?
|
.map_err(|e| format!("Error loading BeaconChain from database: {:?}", e))?
|
||||||
.ok_or_else(|| format!("Unable to find exising BeaconChain in database."))?
|
.ok_or_else(|| format!("Unable to find exising BeaconChain in database."))?
|
||||||
}
|
}
|
||||||
@ -144,6 +145,7 @@ impl<T: BeaconChainTypes> BeaconChainBuilder<T> {
|
|||||||
} => BeaconChain::from_genesis(
|
} => BeaconChain::from_genesis(
|
||||||
store,
|
store,
|
||||||
eth1_backend,
|
eth1_backend,
|
||||||
|
event_handler,
|
||||||
genesis_state.as_ref().clone(),
|
genesis_state.as_ref().clone(),
|
||||||
genesis_block.as_ref().clone(),
|
genesis_block.as_ref().clone(),
|
||||||
self.spec,
|
self.spec,
|
||||||
|
51
beacon_node/beacon_chain/src/events.rs
Normal file
51
beacon_node/beacon_chain/src/events.rs
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
use serde_derive::{Deserialize, Serialize};
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
use types::{Attestation, BeaconBlock, EthSpec, Hash256};
|
||||||
|
|
||||||
|
pub trait EventHandler<T: EthSpec>: Sized + Send + Sync {
|
||||||
|
fn register(&self, kind: EventKind<T>) -> Result<(), String>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct NullEventHandler<T: EthSpec>(PhantomData<T>);
|
||||||
|
|
||||||
|
impl<T: EthSpec> EventHandler<T> for NullEventHandler<T> {
|
||||||
|
fn register(&self, _kind: EventKind<T>) -> Result<(), String> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: EthSpec> Default for NullEventHandler<T> {
|
||||||
|
fn default() -> Self {
|
||||||
|
NullEventHandler(PhantomData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(
|
||||||
|
bound = "T: EthSpec",
|
||||||
|
rename_all = "snake_case",
|
||||||
|
tag = "event",
|
||||||
|
content = "data"
|
||||||
|
)]
|
||||||
|
pub enum EventKind<T: EthSpec> {
|
||||||
|
BeaconHeadChanged {
|
||||||
|
reorg: bool,
|
||||||
|
current_head_beacon_block_root: Hash256,
|
||||||
|
previous_head_beacon_block_root: Hash256,
|
||||||
|
},
|
||||||
|
BeaconBlockImported {
|
||||||
|
block_root: Hash256,
|
||||||
|
block: Box<BeaconBlock<T>>,
|
||||||
|
},
|
||||||
|
BeaconBlockRejected {
|
||||||
|
reason: String,
|
||||||
|
block: Box<BeaconBlock<T>>,
|
||||||
|
},
|
||||||
|
BeaconAttestationImported {
|
||||||
|
attestation: Box<Attestation<T>>,
|
||||||
|
},
|
||||||
|
BeaconAttestationRejected {
|
||||||
|
reason: String,
|
||||||
|
attestation: Box<Attestation<T>>,
|
||||||
|
},
|
||||||
|
}
|
@ -7,6 +7,7 @@ mod beacon_chain_builder;
|
|||||||
mod checkpoint;
|
mod checkpoint;
|
||||||
mod errors;
|
mod errors;
|
||||||
mod eth1_chain;
|
mod eth1_chain;
|
||||||
|
pub mod events;
|
||||||
mod fork_choice;
|
mod fork_choice;
|
||||||
mod iter;
|
mod iter;
|
||||||
mod metrics;
|
mod metrics;
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
AttestationProcessingOutcome, BeaconChain, BeaconChainBuilder, BeaconChainTypes,
|
events::NullEventHandler, AttestationProcessingOutcome, BeaconChain, BeaconChainBuilder,
|
||||||
BlockProcessingOutcome, InteropEth1ChainBackend,
|
BeaconChainTypes, BlockProcessingOutcome, InteropEth1ChainBackend,
|
||||||
};
|
};
|
||||||
use lmd_ghost::LmdGhost;
|
use lmd_ghost::LmdGhost;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
@ -68,6 +68,7 @@ where
|
|||||||
type LmdGhost = L;
|
type LmdGhost = L;
|
||||||
type Eth1Chain = InteropEth1ChainBackend<E>;
|
type Eth1Chain = InteropEth1ChainBackend<E>;
|
||||||
type EthSpec = E;
|
type EthSpec = E;
|
||||||
|
type EventHandler = NullEventHandler<E>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A testing harness which can instantiate a `BeaconChain` and populate it with blocks and
|
/// A testing harness which can instantiate a `BeaconChain` and populate it with blocks and
|
||||||
@ -103,7 +104,11 @@ where
|
|||||||
let chain =
|
let chain =
|
||||||
BeaconChainBuilder::quick_start(HARNESS_GENESIS_TIME, &keypairs, spec.clone(), log)
|
BeaconChainBuilder::quick_start(HARNESS_GENESIS_TIME, &keypairs, spec.clone(), log)
|
||||||
.unwrap_or_else(|e| panic!("Failed to create beacon chain builder: {}", e))
|
.unwrap_or_else(|e| panic!("Failed to create beacon chain builder: {}", e))
|
||||||
.build(store.clone(), InteropEth1ChainBackend::default())
|
.build(
|
||||||
|
store.clone(),
|
||||||
|
InteropEth1ChainBackend::default(),
|
||||||
|
NullEventHandler::default(),
|
||||||
|
)
|
||||||
.unwrap_or_else(|e| panic!("Failed to build beacon chain: {}", e));
|
.unwrap_or_else(|e| panic!("Failed to build beacon chain: {}", e));
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
|
@ -6,6 +6,7 @@ edition = "2018"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
beacon_chain = { path = "../beacon_chain" }
|
beacon_chain = { path = "../beacon_chain" }
|
||||||
|
store = { path = "../store" }
|
||||||
network = { path = "../network" }
|
network = { path = "../network" }
|
||||||
eth2-libp2p = { path = "../eth2-libp2p" }
|
eth2-libp2p = { path = "../eth2-libp2p" }
|
||||||
rpc = { path = "../rpc" }
|
rpc = { path = "../rpc" }
|
||||||
|
@ -20,18 +20,19 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
|||||||
use tokio::runtime::TaskExecutor;
|
use tokio::runtime::TaskExecutor;
|
||||||
use tokio::timer::Interval;
|
use tokio::timer::Interval;
|
||||||
use types::EthSpec;
|
use types::EthSpec;
|
||||||
|
use websocket_server::WebSocketSender;
|
||||||
|
|
||||||
pub use beacon_chain::{BeaconChainTypes, Eth1ChainBackend, InteropEth1ChainBackend};
|
pub use beacon_chain::{BeaconChainTypes, Eth1ChainBackend, InteropEth1ChainBackend};
|
||||||
pub use config::{BeaconChainStartMethod, Config as ClientConfig, Eth1BackendMethod};
|
pub use config::{BeaconChainStartMethod, Config as ClientConfig, Eth1BackendMethod};
|
||||||
pub use eth2_config::Eth2Config;
|
pub use eth2_config::Eth2Config;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ClientType<S: Store, E: EthSpec> {
|
pub struct RuntimeBeaconChainTypes<S: Store, E: EthSpec> {
|
||||||
_phantom_s: PhantomData<S>,
|
_phantom_s: PhantomData<S>,
|
||||||
_phantom_e: PhantomData<E>,
|
_phantom_e: PhantomData<E>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, E> BeaconChainTypes for ClientType<S, E>
|
impl<S, E> BeaconChainTypes for RuntimeBeaconChainTypes<S, E>
|
||||||
where
|
where
|
||||||
S: Store + 'static,
|
S: Store + 'static,
|
||||||
E: EthSpec,
|
E: EthSpec,
|
||||||
@ -41,17 +42,22 @@ where
|
|||||||
type LmdGhost = ThreadSafeReducedTree<S, E>;
|
type LmdGhost = ThreadSafeReducedTree<S, E>;
|
||||||
type Eth1Chain = InteropEth1ChainBackend<E>;
|
type Eth1Chain = InteropEth1ChainBackend<E>;
|
||||||
type EthSpec = E;
|
type EthSpec = E;
|
||||||
|
type EventHandler = WebSocketSender<E>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Main beacon node client service. This provides the connection and initialisation of the clients
|
/// Main beacon node client service. This provides the connection and initialisation of the clients
|
||||||
/// sub-services in multiple threads.
|
/// sub-services in multiple threads.
|
||||||
pub struct Client<T: BeaconChainTypes> {
|
pub struct Client<S, E>
|
||||||
|
where
|
||||||
|
S: Store + Clone + 'static,
|
||||||
|
E: EthSpec,
|
||||||
|
{
|
||||||
/// Configuration for the lighthouse client.
|
/// Configuration for the lighthouse client.
|
||||||
_client_config: ClientConfig,
|
_client_config: ClientConfig,
|
||||||
/// The beacon chain for the running client.
|
/// The beacon chain for the running client.
|
||||||
beacon_chain: Arc<BeaconChain<T>>,
|
beacon_chain: Arc<BeaconChain<RuntimeBeaconChainTypes<S, E>>>,
|
||||||
/// Reference to the network service.
|
/// Reference to the network service.
|
||||||
pub network: Arc<NetworkService<T>>,
|
pub network: Arc<NetworkService<RuntimeBeaconChainTypes<S, E>>>,
|
||||||
/// Signal to terminate the RPC server.
|
/// Signal to terminate the RPC server.
|
||||||
pub rpc_exit_signal: Option<Signal>,
|
pub rpc_exit_signal: Option<Signal>,
|
||||||
/// Signal to terminate the slot timer.
|
/// Signal to terminate the slot timer.
|
||||||
@ -60,19 +66,22 @@ pub struct Client<T: BeaconChainTypes> {
|
|||||||
pub api_exit_signal: Option<Signal>,
|
pub api_exit_signal: Option<Signal>,
|
||||||
/// The clients logger.
|
/// The clients logger.
|
||||||
log: slog::Logger,
|
log: slog::Logger,
|
||||||
|
/*
|
||||||
/// Marker to pin the beacon chain generics.
|
/// Marker to pin the beacon chain generics.
|
||||||
phantom: PhantomData<T>,
|
phantom: PhantomData<BeaconChainTypes>,
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Client<T>
|
impl<S, E> Client<S, E>
|
||||||
where
|
where
|
||||||
T: BeaconChainTypes + Clone,
|
S: Store + Clone + 'static,
|
||||||
|
E: EthSpec,
|
||||||
{
|
{
|
||||||
/// Generate an instance of the client. Spawn and link all internal sub-processes.
|
/// Generate an instance of the client. Spawn and link all internal sub-processes.
|
||||||
pub fn new(
|
pub fn new(
|
||||||
client_config: ClientConfig,
|
client_config: ClientConfig,
|
||||||
eth2_config: Eth2Config,
|
eth2_config: Eth2Config,
|
||||||
store: T::Store,
|
store: S,
|
||||||
log: slog::Logger,
|
log: slog::Logger,
|
||||||
executor: &TaskExecutor,
|
executor: &TaskExecutor,
|
||||||
) -> error::Result<Self> {
|
) -> error::Result<Self> {
|
||||||
@ -169,11 +178,19 @@ where
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let eth1_backend = T::Eth1Chain::new(String::new()).map_err(|e| format!("{:?}", e))?;
|
let eth1_backend =
|
||||||
|
InteropEth1ChainBackend::new(String::new()).map_err(|e| format!("{:?}", e))?;
|
||||||
|
|
||||||
let beacon_chain: Arc<BeaconChain<T>> = Arc::new(
|
// Start the websocket server.
|
||||||
|
let websocket_sender: WebSocketSender<E> = if client_config.websocket_server.enabled {
|
||||||
|
websocket_server::start_server(&client_config.websocket_server, &log)?
|
||||||
|
} else {
|
||||||
|
WebSocketSender::dummy()
|
||||||
|
};
|
||||||
|
|
||||||
|
let beacon_chain: Arc<BeaconChain<RuntimeBeaconChainTypes<S, E>>> = Arc::new(
|
||||||
beacon_chain_builder
|
beacon_chain_builder
|
||||||
.build(store, eth1_backend)
|
.build(store, eth1_backend, websocket_sender)
|
||||||
.map_err(error::Error::from)?,
|
.map_err(error::Error::from)?,
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -229,11 +246,6 @@ where
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
// Start the websocket server
|
|
||||||
let _websocket_sender = if client_config.websocket_server.enabled {
|
|
||||||
websocket_server::start_server::<T::EthSpec>(&client_config.websocket_server, &log)?;
|
|
||||||
};
|
|
||||||
|
|
||||||
let (slot_timer_exit_signal, exit) = exit_future::signal();
|
let (slot_timer_exit_signal, exit) = exit_future::signal();
|
||||||
if let Some(duration_to_next_slot) = beacon_chain.slot_clock.duration_to_next_slot() {
|
if let Some(duration_to_next_slot) = beacon_chain.slot_clock.duration_to_next_slot() {
|
||||||
// set up the validator work interval - start at next slot and proceed every slot
|
// set up the validator work interval - start at next slot and proceed every slot
|
||||||
@ -268,12 +280,11 @@ where
|
|||||||
api_exit_signal,
|
api_exit_signal,
|
||||||
log,
|
log,
|
||||||
network,
|
network,
|
||||||
phantom: PhantomData,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> Drop for Client<T> {
|
impl<S: Store + Clone, E: EthSpec> Drop for Client<S, E> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// Save the beacon chain to it's store before dropping.
|
// Save the beacon chain to it's store before dropping.
|
||||||
let _result = self.beacon_chain.persist();
|
let _result = self.beacon_chain.persist();
|
||||||
|
@ -1,11 +1,12 @@
|
|||||||
use crate::Client;
|
use crate::Client;
|
||||||
use beacon_chain::BeaconChainTypes;
|
|
||||||
use exit_future::Exit;
|
use exit_future::Exit;
|
||||||
use futures::{Future, Stream};
|
use futures::{Future, Stream};
|
||||||
use slog::{debug, o, warn};
|
use slog::{debug, o, warn};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
use store::Store;
|
||||||
use tokio::runtime::TaskExecutor;
|
use tokio::runtime::TaskExecutor;
|
||||||
use tokio::timer::Interval;
|
use tokio::timer::Interval;
|
||||||
|
use types::EthSpec;
|
||||||
|
|
||||||
/// The interval between heartbeat events.
|
/// The interval between heartbeat events.
|
||||||
pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 15;
|
pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 15;
|
||||||
@ -17,7 +18,11 @@ pub const WARN_PEER_COUNT: usize = 1;
|
|||||||
/// durations.
|
/// durations.
|
||||||
///
|
///
|
||||||
/// Presently unused, but remains for future use.
|
/// Presently unused, but remains for future use.
|
||||||
pub fn run<T: BeaconChainTypes>(client: &Client<T>, executor: TaskExecutor, exit: Exit) {
|
pub fn run<S, E>(client: &Client<S, E>, executor: TaskExecutor, exit: Exit)
|
||||||
|
where
|
||||||
|
S: Store + Clone + 'static,
|
||||||
|
E: EthSpec,
|
||||||
|
{
|
||||||
// notification heartbeat
|
// notification heartbeat
|
||||||
let interval = Interval::new(
|
let interval = Interval::new(
|
||||||
Instant::now(),
|
Instant::now(),
|
||||||
|
@ -1,19 +1,17 @@
|
|||||||
use client::{
|
use client::{error, notifier, Client, ClientConfig, Eth1BackendMethod, Eth2Config};
|
||||||
error, notifier, BeaconChainTypes, Client, ClientConfig, ClientType, Eth1BackendMethod,
|
|
||||||
Eth2Config,
|
|
||||||
};
|
|
||||||
use futures::sync::oneshot;
|
use futures::sync::oneshot;
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use slog::{error, info};
|
use slog::{error, info};
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use store::Store;
|
||||||
use store::{DiskStore, MemoryStore};
|
use store::{DiskStore, MemoryStore};
|
||||||
use tokio::runtime::Builder;
|
use tokio::runtime::Builder;
|
||||||
use tokio::runtime::Runtime;
|
use tokio::runtime::Runtime;
|
||||||
use tokio::runtime::TaskExecutor;
|
use tokio::runtime::TaskExecutor;
|
||||||
use tokio_timer::clock::Clock;
|
use tokio_timer::clock::Clock;
|
||||||
use types::{InteropEthSpec, MainnetEthSpec, MinimalEthSpec};
|
use types::{EthSpec, InteropEthSpec, MainnetEthSpec, MinimalEthSpec};
|
||||||
|
|
||||||
/// Reads the configuration and initializes a `BeaconChain` with the required types and parameters.
|
/// Reads the configuration and initializes a `BeaconChain` with the required types and parameters.
|
||||||
///
|
///
|
||||||
@ -52,14 +50,7 @@ pub fn run_beacon_node(
|
|||||||
|
|
||||||
macro_rules! run_client {
|
macro_rules! run_client {
|
||||||
($store: ty, $eth_spec: ty) => {
|
($store: ty, $eth_spec: ty) => {
|
||||||
run::<ClientType<$store, $eth_spec>>(
|
run::<$store, $eth_spec>(&db_path, client_config, eth2_config, executor, runtime, log)
|
||||||
&db_path,
|
|
||||||
client_config,
|
|
||||||
eth2_config,
|
|
||||||
executor,
|
|
||||||
runtime,
|
|
||||||
log,
|
|
||||||
)
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,7 +73,7 @@ pub fn run_beacon_node(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Performs the type-generic parts of launching a `BeaconChain`.
|
/// Performs the type-generic parts of launching a `BeaconChain`.
|
||||||
fn run<T>(
|
fn run<S, E>(
|
||||||
db_path: &Path,
|
db_path: &Path,
|
||||||
client_config: ClientConfig,
|
client_config: ClientConfig,
|
||||||
eth2_config: Eth2Config,
|
eth2_config: Eth2Config,
|
||||||
@ -91,12 +82,13 @@ fn run<T>(
|
|||||||
log: &slog::Logger,
|
log: &slog::Logger,
|
||||||
) -> error::Result<()>
|
) -> error::Result<()>
|
||||||
where
|
where
|
||||||
T: BeaconChainTypes + Clone,
|
S: Store + Clone + 'static + OpenDatabase,
|
||||||
T::Store: OpenDatabase,
|
E: EthSpec,
|
||||||
{
|
{
|
||||||
let store = T::Store::open_database(&db_path)?;
|
let store = S::open_database(&db_path)?;
|
||||||
|
|
||||||
let client: Client<T> = Client::new(client_config, eth2_config, store, log.clone(), &executor)?;
|
let client: Client<S, E> =
|
||||||
|
Client::new(client_config, eth2_config, store, log.clone(), &executor)?;
|
||||||
|
|
||||||
// run service until ctrl-c
|
// run service until ctrl-c
|
||||||
let (ctrlc_send, ctrlc_oneshot) = oneshot::channel();
|
let (ctrlc_send, ctrlc_oneshot) = oneshot::channel();
|
||||||
|
@ -7,10 +7,12 @@ edition = "2018"
|
|||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
beacon_chain = { path = "../beacon_chain" }
|
||||||
exit-future = "0.1.3"
|
exit-future = "0.1.3"
|
||||||
futures = "0.1.25"
|
futures = "0.1.25"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
serde_derive = "1.0"
|
serde_derive = "1.0"
|
||||||
|
serde_json = "^1.0"
|
||||||
slog = "^2.2.3"
|
slog = "^2.2.3"
|
||||||
tokio = "0.1.16"
|
tokio = "0.1.16"
|
||||||
types = { path = "../../eth2/types" }
|
types = { path = "../../eth2/types" }
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
use serde_derive::{Deserialize, Serialize};
|
use beacon_chain::events::{EventHandler, EventKind};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use slog::{error, info, Logger};
|
use slog::{error, info, Logger};
|
||||||
|
use std::marker::PhantomData;
|
||||||
use std::net::Ipv4Addr;
|
use std::net::Ipv4Addr;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use types::EthSpec;
|
use types::EthSpec;
|
||||||
@ -25,19 +27,44 @@ impl Default for Config {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WebSocketSender {
|
pub struct WebSocketSender<T: EthSpec> {
|
||||||
sender: Sender,
|
sender: Option<Sender>,
|
||||||
|
_phantom: PhantomData<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: EthSpec> WebSocketSender<T> {
|
||||||
|
/// Creates a dummy websocket server that never starts and where all future calls are no-ops.
|
||||||
|
pub fn dummy() -> Self {
|
||||||
|
Self {
|
||||||
|
sender: None,
|
||||||
|
_phantom: PhantomData,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WebSocketSender {
|
|
||||||
pub fn send_string(&self, string: String) -> Result<(), String> {
|
pub fn send_string(&self, string: String) -> Result<(), String> {
|
||||||
self.sender
|
if let Some(sender) = &self.sender {
|
||||||
|
sender
|
||||||
.send(string)
|
.send(string)
|
||||||
.map_err(|e| format!("Unable to broadcast to websocket clients: {:?}", e))
|
.map_err(|e| format!("Unable to broadcast to websocket clients: {:?}", e))
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_server<T: EthSpec>(config: &Config, log: &Logger) -> Result<WebSocketSender, String> {
|
impl<T: EthSpec> EventHandler<T> for WebSocketSender<T> {
|
||||||
|
fn register(&self, kind: EventKind<T>) -> Result<(), String> {
|
||||||
|
self.send_string(
|
||||||
|
serde_json::to_string(&kind)
|
||||||
|
.map_err(|e| format!("Unable to serialize event: {:?}", e))?,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start_server<T: EthSpec>(
|
||||||
|
config: &Config,
|
||||||
|
log: &Logger,
|
||||||
|
) -> Result<WebSocketSender<T>, String> {
|
||||||
let server_string = format!("{}:{}", config.listen_address, config.port);
|
let server_string = format!("{}:{}", config.listen_address, config.port);
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
@ -70,6 +97,7 @@ pub fn start_server<T: EthSpec>(config: &Config, log: &Logger) -> Result<WebSock
|
|||||||
});
|
});
|
||||||
|
|
||||||
Ok(WebSocketSender {
|
Ok(WebSocketSender {
|
||||||
sender: broadcaster,
|
sender: Some(broadcaster),
|
||||||
|
_phantom: PhantomData,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user