diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 5020406ae..f43f0403c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -12,6 +12,7 @@ use crate::block_verification::{ signature_verify_chain_segment, BlockError, ExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock, PayloadVerificationOutcome, POS_PANDA_BANNER, }; +pub use crate::canonical_head::{CanonicalHead, CanonicalHeadRwLock}; use crate::chain_config::ChainConfig; use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; @@ -57,6 +58,7 @@ use eth2::types::{EventKind, SseBlock, SyncDuty}; use execution_layer::{ BuilderParams, ChainHealth, ExecutionLayer, FailedCondition, PayloadAttributes, PayloadStatus, }; +pub use fork_choice::CountUnrealized; use fork_choice::{ AttestationFromBlock, ExecutionStatus, ForkChoice, ForkchoiceUpdateParameters, InvalidationOperation, PayloadVerificationStatus, ResetPayloadStatuses, @@ -98,8 +100,6 @@ use task_executor::{ShutdownReason, TaskExecutor}; use tree_hash::TreeHash; use types::beacon_state::CloneConfig; use types::*; -pub use crate::canonical_head::{CanonicalHead, CanonicalHeadRwLock}; -pub use fork_choice::CountUnrealized; pub type ForkChoiceError = fork_choice::Error; diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 031abf721..9aa8289fc 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -8,7 +8,7 @@ use sensitive_url::SensitiveUrl; use serde::de::DeserializeOwned; use serde_json::json; use std::time::Duration; -use types::{EthSpec}; +use types::EthSpec; pub use deposit_log::{DepositLog, Log}; pub use reqwest::Client; diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 6b6cef8e7..23a142a7e 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -21,8 +21,8 @@ use tokio_util::{ compat::{Compat, FuturesAsyncReadCompatExt}, }; use types::{ - BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, EthSpec, - ForkContext, ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock, + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, EthSpec, ForkContext, + ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock, }; lazy_static! { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 03feb267a..fafff57fd 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -16,9 +16,9 @@ use crate::types::{ subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, }; +use crate::EnrExt; use crate::Eth2Enr; use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash}; -use crate::{EnrExt}; use api_types::{PeerRequestId, Request, RequestId, Response}; use futures::stream::StreamExt; use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettings}; @@ -35,7 +35,7 @@ use libp2p::swarm::{ConnectionLimits, Swarm, SwarmBuilder, SwarmEvent}; use libp2p::PeerId; use slog::{crit, debug, info, o, trace, warn}; use std::io::Write; -use std::path::{PathBuf}; +use std::path::PathBuf; use std::pin::Pin; use std::{ marker::PhantomData, diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index fc96767b3..97e18acc3 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -60,13 +60,13 @@ use std::task::Context; use std::time::Duration; use std::{cmp, collections::HashSet}; use task_executor::TaskExecutor; -use tokio::sync::{mpsc}; -use types::{ - Attestation, AttesterSlashing, Hash256, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, - SubnetId, SyncCommitteeMessage, SyncSubnetId, -}; +use tokio::sync::mpsc; use types::signed_blobs_sidecar::SignedBlobsSidecar; +use types::{ + Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, + SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, + SyncCommitteeMessage, SyncSubnetId, +}; use work_reprocessing_queue::{ spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, }; @@ -788,7 +788,7 @@ pub enum Work { peer_id: PeerId, request_id: PeerRequestId, request: BlobsByRangeRequest, - } + }, } impl Work { @@ -812,7 +812,7 @@ impl Work { Work::Status { .. } => STATUS_PROCESSING, Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST, Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST, - Work::BlobsByRangeRequest {..} => BLOBS_BY_RANGE_REQUEST, + Work::BlobsByRangeRequest { .. } => BLOBS_BY_RANGE_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, } @@ -1693,7 +1693,7 @@ impl BeaconProcessor { Work::BlobsByRangeRequest { peer_id, request_id, - request + request, } => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| { worker.handle_blobs_by_range_request( sub_executor, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 18c792cb6..8454d83f2 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -17,12 +17,12 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; -use types::{ - Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, - ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, - SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, -}; use types::signed_blobs_sidecar::SignedBlobsSidecar; +use types::{ + Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, + SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, + Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, +}; use super::{ super::work_reprocessing_queue::{ @@ -959,31 +959,31 @@ impl Worker { Ok(block_root) => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); - if reprocess_tx - .try_send(ReprocessQueueMessage::BlockImported(block_root)) - .is_err() - { - error!( + if reprocess_tx + .try_send(ReprocessQueueMessage::BlockImported(block_root)) + .is_err() + { + error!( self.log, "Failed to inform block import"; "source" => "gossip", "block_root" => ?block_root, ) - }; + }; - debug!( + debug!( self.log, "Gossipsub block processed"; "block" => ?block_root, "peer_id" => %peer_id ); - self.chain.recompute_head_at_current_slot().await; - } - Err(BlockError::ParentUnknown { .. }) => { - // Inform the sync manager to find parents for this block - // This should not occur. It should be checked by `should_forward_block` - error!( + self.chain.recompute_head_at_current_slot().await; + } + Err(BlockError::ParentUnknown { .. }) => { + // Inform the sync manager to find parents for this block + // This should not occur. It should be checked by `should_forward_block` + error!( self.log, "Block with unknown parent attempted to be processed"; "peer_id" => %peer_id @@ -996,28 +996,28 @@ impl Worker { "Failed to verify execution payload"; "error" => %e ); - }, - other => { - debug!( + } + other => { + debug!( self.log, "Invalid gossip beacon block"; "outcome" => ?other, "block root" => ?block_root, "block slot" => block.slot() ); - self.gossip_penalize_peer( - peer_id, - PeerAction::MidToleranceError, - "bad_gossip_block_ssz", - ); - trace!( + self.gossip_penalize_peer( + peer_id, + PeerAction::MidToleranceError, + "bad_gossip_block_ssz", + ); + trace!( self.log, "Invalid gossip beacon block ssz"; "ssz" => format_args!("0x{}", hex::encode(block.as_ssz_bytes())), ); - } - }; } + }; + } pub fn process_gossip_voluntary_exit( self, diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 2ef858eee..beaea3833 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -4,9 +4,9 @@ use crate::status::ToStatusMessage; use crate::sync::SyncMessage; use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, WhenSlotSkipped}; use itertools::process_results; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MAX_REQUEST_BLOBS_SIDECARS}; use lighthouse_network::rpc::StatusMessage; use lighthouse_network::rpc::*; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MAX_REQUEST_BLOBS_SIDECARS}; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error}; use slot_clock::SlotClock; @@ -384,141 +384,141 @@ impl Worker { mut req: BlobsByRangeRequest, ) { debug!(self.log, "Received BlobsByRange Request"; - "peer_id" => %peer_id, - "count" => req.count, - "start_slot" => req.start_slot, - ); + "peer_id" => %peer_id, + "count" => req.count, + "start_slot" => req.start_slot, + ); - // Should not send more than max request blocks - if req.count > MAX_REQUEST_BLOBS_SIDECARS { - req.count = MAX_REQUEST_BLOBS_SIDECARS; - } + // Should not send more than max request blocks + if req.count > MAX_REQUEST_BLOBS_SIDECARS { + req.count = MAX_REQUEST_BLOBS_SIDECARS; + } //FIXME(sean) create the blobs iter - // let forwards_block_root_iter = match self - // .chain - // .forwards_iter_block_roots(Slot::from(req.start_slot)) - // { - // Ok(iter) => iter, - // Err(BeaconChainError::HistoricalBlockError( - // HistoricalBlockError::BlockOutOfRange { - // slot, - // oldest_block_slot, - // }, - // )) => { - // debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); - // return self.send_error_response( - // peer_id, - // RPCResponseErrorCode::ResourceUnavailable, - // "Backfilling".into(), - // request_id, - // ); - // } - // Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), - // }; - // - // // Pick out the required blocks, ignoring skip-slots. - // let mut last_block_root = None; - // let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - // iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // // map skip slots to None - // .map(|(root, _)| { - // let result = if Some(root) == last_block_root { - // None - // } else { - // Some(root) - // }; - // last_block_root = Some(root); - // result - // }) - // .collect::>>() - // }); - // - // let block_roots = match maybe_block_roots { - // Ok(block_roots) => block_roots, - // Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e), - // }; - // - // // remove all skip slots - // let block_roots = block_roots.into_iter().flatten().collect::>(); - // - // // Fetching blocks is async because it may have to hit the execution layer for payloads. - // executor.spawn( - // async move { - // let mut blocks_sent = 0; - // let mut send_response = true; - // - // for root in block_roots { - // match self.chain.store.get_blobs(&root) { - // Ok(Some(blob)) => { - // blocks_sent += 1; - // self.send_network_message(NetworkMessage::SendResponse { - // peer_id, - // response: Response::BlobsByRange(Some(Arc::new(VariableList::new(vec![blob.message]).unwrap()))), - // id: request_id, - // }); - // } - // Ok(None) => { - // error!( - // self.log, - // "Blob in the chain is not in the store"; - // "request_root" => ?root - // ); - // break; - // } - // Err(e) => { - // error!( - // self.log, - // "Error fetching block for peer"; - // "block_root" => ?root, - // "error" => ?e - // ); - // break; - // } - // } - // } - // - // let current_slot = self - // .chain - // .slot() - // .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - // - // if blocks_sent < (req.count as usize) { - // debug!( - // self.log, - // "BlocksByRange Response processed"; - // "peer" => %peer_id, - // "msg" => "Failed to return all requested blocks", - // "start_slot" => req.start_slot, - // "current_slot" => current_slot, - // "requested" => req.count, - // "returned" => blocks_sent - // ); - // } else { - // debug!( - // self.log, - // "BlocksByRange Response processed"; - // "peer" => %peer_id, - // "start_slot" => req.start_slot, - // "current_slot" => current_slot, - // "requested" => req.count, - // "returned" => blocks_sent - // ); - // } - // - // if send_response { - // // send the stream terminator - // self.send_network_message(NetworkMessage::SendResponse { - // peer_id, - // response: Response::BlobsByRange(None), - // id: request_id, - // }); - // } - // - // drop(send_on_drop); - // }, - // "load_blocks_by_range_blocks", - // ); + // let forwards_block_root_iter = match self + // .chain + // .forwards_iter_block_roots(Slot::from(req.start_slot)) + // { + // Ok(iter) => iter, + // Err(BeaconChainError::HistoricalBlockError( + // HistoricalBlockError::BlockOutOfRange { + // slot, + // oldest_block_slot, + // }, + // )) => { + // debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); + // return self.send_error_response( + // peer_id, + // RPCResponseErrorCode::ResourceUnavailable, + // "Backfilling".into(), + // request_id, + // ); + // } + // Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), + // }; + // + // // Pick out the required blocks, ignoring skip-slots. + // let mut last_block_root = None; + // let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + // iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) + // // map skip slots to None + // .map(|(root, _)| { + // let result = if Some(root) == last_block_root { + // None + // } else { + // Some(root) + // }; + // last_block_root = Some(root); + // result + // }) + // .collect::>>() + // }); + // + // let block_roots = match maybe_block_roots { + // Ok(block_roots) => block_roots, + // Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e), + // }; + // + // // remove all skip slots + // let block_roots = block_roots.into_iter().flatten().collect::>(); + // + // // Fetching blocks is async because it may have to hit the execution layer for payloads. + // executor.spawn( + // async move { + // let mut blocks_sent = 0; + // let mut send_response = true; + // + // for root in block_roots { + // match self.chain.store.get_blobs(&root) { + // Ok(Some(blob)) => { + // blocks_sent += 1; + // self.send_network_message(NetworkMessage::SendResponse { + // peer_id, + // response: Response::BlobsByRange(Some(Arc::new(VariableList::new(vec![blob.message]).unwrap()))), + // id: request_id, + // }); + // } + // Ok(None) => { + // error!( + // self.log, + // "Blob in the chain is not in the store"; + // "request_root" => ?root + // ); + // break; + // } + // Err(e) => { + // error!( + // self.log, + // "Error fetching block for peer"; + // "block_root" => ?root, + // "error" => ?e + // ); + // break; + // } + // } + // } + // + // let current_slot = self + // .chain + // .slot() + // .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + // + // if blocks_sent < (req.count as usize) { + // debug!( + // self.log, + // "BlocksByRange Response processed"; + // "peer" => %peer_id, + // "msg" => "Failed to return all requested blocks", + // "start_slot" => req.start_slot, + // "current_slot" => current_slot, + // "requested" => req.count, + // "returned" => blocks_sent + // ); + // } else { + // debug!( + // self.log, + // "BlocksByRange Response processed"; + // "peer" => %peer_id, + // "start_slot" => req.start_slot, + // "current_slot" => current_slot, + // "requested" => req.count, + // "returned" => blocks_sent + // ); + // } + // + // if send_response { + // // send the stream terminator + // self.send_network_message(NetworkMessage::SendResponse { + // peer_id, + // response: Response::BlobsByRange(None), + // id: request_id, + // }); + // } + // + // drop(send_on_drop); + // }, + // "load_blocks_by_range_blocks", + // ); } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index be382efe7..c2cf483d9 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -6,8 +6,8 @@ use crate::status::status_message; use crate::sync::manager::RequestId as SyncId; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use lighthouse_network::rpc::*; use lighthouse_network::rpc::methods::BlobsByRangeRequest; +use lighthouse_network::rpc::*; use lighthouse_network::{ Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response, }; @@ -17,12 +17,12 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::SyncCommitteeMessage; use tokio::sync::mpsc; +use types::signed_blobs_sidecar::SignedBlobsSidecar; use types::{ Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId, }; -use types::signed_blobs_sidecar::SignedBlobsSidecar; /// Processes validated messages from the network. It relays necessary data to the syncing thread /// and processes blocks from the pubsub network. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index d5dfb60fb..928669590 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -593,7 +593,7 @@ impl SyncManager { .parent_chain_processed(chain_hash, result, &mut self.network), }, //FIXME(sean) - SyncMessage::RpcBlob { .. } => todo!() + SyncMessage::RpcBlob { .. } => todo!(), } } diff --git a/consensus/types/src/blob.rs b/consensus/types/src/blob.rs index a1f86dab6..89e5e8bbe 100644 --- a/consensus/types/src/blob.rs +++ b/consensus/types/src/blob.rs @@ -1,7 +1,7 @@ use crate::bls_field_element::BlsFieldElement; use crate::test_utils::RngCore; use crate::test_utils::TestRandom; -use crate::{EthSpec}; +use crate::EthSpec; use serde::{Deserialize, Serialize}; use ssz::{Decode, DecodeError, Encode}; use ssz_types::VariableList; diff --git a/consensus/types/src/bls_field_element.rs b/consensus/types/src/bls_field_element.rs index 818c9df47..7654f65b3 100644 --- a/consensus/types/src/bls_field_element.rs +++ b/consensus/types/src/bls_field_element.rs @@ -1,4 +1,4 @@ -use crate::{Uint256}; +use crate::Uint256; use serde::{Deserialize, Serialize}; use ssz::{Decode, DecodeError, Encode}; use tree_hash::{PackedEncoding, TreeHash}; diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index 8b4ccfd3c..78a53a367 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -1,6 +1,4 @@ -use crate::{ - test_utils::TestRandom, *, -}; +use crate::{test_utils::TestRandom, *}; use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz::Encode;