use crate::{http_metrics::metrics, validator_store::ValidatorStore}; use environment::RuntimeContext; use eth2::{types::Graffiti, BeaconNodeHttpClient}; use futures::channel::mpsc::Receiver; use futures::{StreamExt, TryFutureExt}; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; use types::{EthSpec, PublicKey, Slot}; /// Builds a `BlockService`. pub struct BlockServiceBuilder { validator_store: Option>, slot_clock: Option>, beacon_node: Option, context: Option>, graffiti: Option, } impl BlockServiceBuilder { pub fn new() -> Self { Self { validator_store: None, slot_clock: None, beacon_node: None, context: None, graffiti: None, } } pub fn validator_store(mut self, store: ValidatorStore) -> Self { self.validator_store = Some(store); self } pub fn slot_clock(mut self, slot_clock: T) -> Self { self.slot_clock = Some(Arc::new(slot_clock)); self } pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self { self.beacon_node = Some(beacon_node); self } pub fn runtime_context(mut self, context: RuntimeContext) -> Self { self.context = Some(context); self } pub fn graffiti(mut self, graffiti: Option) -> Self { self.graffiti = graffiti; self } pub fn build(self) -> Result, String> { Ok(BlockService { inner: Arc::new(Inner { validator_store: self .validator_store .ok_or("Cannot build BlockService without validator_store")?, slot_clock: self .slot_clock .ok_or("Cannot build BlockService without slot_clock")?, beacon_node: self .beacon_node .ok_or("Cannot build BlockService without beacon_node")?, context: self .context .ok_or("Cannot build BlockService without runtime_context")?, graffiti: self.graffiti, }), }) } } /// Helper to minimise `Arc` usage. pub struct Inner { validator_store: ValidatorStore, slot_clock: Arc, beacon_node: BeaconNodeHttpClient, context: RuntimeContext, graffiti: Option, } /// Attempts to produce attestations for any block producer(s) at the start of the epoch. pub struct BlockService { inner: Arc>, } impl Clone for BlockService { fn clone(&self) -> Self { Self { inner: self.inner.clone(), } } } impl Deref for BlockService { type Target = Inner; fn deref(&self) -> &Self::Target { self.inner.deref() } } /// Notification from the duties service that we should try to produce a block. pub struct BlockServiceNotification { pub slot: Slot, pub block_proposers: Vec, } impl BlockService { pub fn start_update_service( self, notification_rx: Receiver, ) -> Result<(), String> { let log = self.context.log().clone(); info!(log, "Block production service started"); let executor = self.inner.context.executor.clone(); let block_service_fut = notification_rx.for_each(move |notif| { let service = self.clone(); async move { service.do_update(notif).await.ok(); } }); executor.spawn(block_service_fut, "block_service"); Ok(()) } /// Attempt to produce a block for any block producers in the `ValidatorStore`. async fn do_update(&self, notification: BlockServiceNotification) -> Result<(), ()> { let log = self.context.log(); let _timer = metrics::start_timer_vec(&metrics::BLOCK_SERVICE_TIMES, &[metrics::FULL_UPDATE]); let slot = self.slot_clock.now().ok_or_else(move || { crit!(log, "Duties manager failed to read slot clock"); })?; if notification.slot != slot { warn!( log, "Skipping block production for expired slot"; "current_slot" => slot.as_u64(), "notification_slot" => notification.slot.as_u64(), "info" => "Your machine could be overloaded" ); return Ok(()); } if slot == self.context.eth2_config.spec.genesis_slot { debug!( log, "Not producing block at genesis slot"; "proposers" => format!("{:?}", notification.block_proposers), ); return Ok(()); } trace!( log, "Block service update started"; "slot" => slot.as_u64() ); let proposers = notification.block_proposers; if proposers.is_empty() { trace!( log, "No local block proposers for this slot"; "slot" => slot.as_u64() ) } else if proposers.len() > 1 { error!( log, "Multiple block proposers for this slot"; "action" => "producing blocks for all proposers", "num_proposers" => proposers.len(), "slot" => slot.as_u64(), ) } for validator_pubkey in proposers { let service = self.clone(); let log = log.clone(); self.inner.context.executor.spawn( service .publish_block(slot, validator_pubkey) .unwrap_or_else(move |e| { crit!( log, "Error whilst producing block"; "message" => e ); }), "block service", ); } Ok(()) } /// Produce a block at the given slot for validator_pubkey async fn publish_block(self, slot: Slot, validator_pubkey: PublicKey) -> Result<(), String> { let log = self.context.log(); let _timer = metrics::start_timer_vec(&metrics::BLOCK_SERVICE_TIMES, &[metrics::BEACON_BLOCK]); let current_slot = self .slot_clock .now() .ok_or("Unable to determine current slot from clock")?; let randao_reveal = self .validator_store .randao_reveal(&validator_pubkey, slot.epoch(E::slots_per_epoch())) .ok_or("Unable to produce randao reveal")?; let block = self .beacon_node .get_validator_blocks(slot, randao_reveal.into(), self.graffiti.as_ref()) .await .map_err(|e| format!("Error from beacon node when producing block: {:?}", e))? .data; let signed_block = self .validator_store .sign_block(&validator_pubkey, block, current_slot) .ok_or("Unable to sign block")?; self.beacon_node .post_beacon_blocks(&signed_block) .await .map_err(|e| format!("Error from beacon node when publishing block: {:?}", e))?; info!( log, "Successfully published block"; "deposits" => signed_block.message.body.deposits.len(), "attestations" => signed_block.message.body.attestations.len(), "slot" => signed_block.slot().as_u64(), ); Ok(()) } }