Merge pull request #23 from divagant-martian/blob-syncing

make it compile to start from here
This commit is contained in:
realbigsean 2022-11-21 14:59:12 -05:00 committed by GitHub
commit a0d712ed8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 106 additions and 62 deletions

View File

@ -1034,6 +1034,8 @@ pub fn serve<T: BeaconChainTypes>(
*/ */
// POST beacon/blocks // POST beacon/blocks
// TODO: THIS IS NOT THE RIGHT CODE
let post_beacon_blocks = eth_v1 let post_beacon_blocks = eth_v1
.and(warp::path("beacon")) .and(warp::path("beacon"))
.and(warp::path("blocks")) .and(warp::path("blocks"))
@ -1047,12 +1049,11 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>, network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| async move { log: Logger| async move {
publish_blocks::publish_block(None, block, None, chain, &network_tx, log) publish_blocks::publish_block(None, block, chain, &network_tx, log)
.await .await
.map(|()| warp::reply()) .map(|()| warp::reply())
}, },
); );
/* /*
* beacon/blocks * beacon/blocks
*/ */

View File

@ -1,7 +1,7 @@
use crate::metrics; use crate::metrics;
use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now}; use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized};
use lighthouse_network::{PubsubMessage, SignedBeaconBlockAndBlobsSidecar}; use lighthouse_network::PubsubMessage;
use network::NetworkMessage; use network::NetworkMessage;
use slog::{crit, error, info, warn, Logger}; use slog::{crit, error, info, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
@ -31,6 +31,8 @@ pub async fn publish_block<T: BeaconChainTypes>(
// Send the block, regardless of whether or not it is valid. The API // Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour. // specification is very clear that this is the desired behaviour.
let message = todo!("");
/*
let message = if matches!(block, SignedBeaconBlock::Eip4844(_)) { let message = if matches!(block, SignedBeaconBlock::Eip4844(_)) {
if let Some(sidecar) = chain.blob_cache.pop(&block_root) { if let Some(sidecar) = chain.blob_cache.pop(&block_root) {
PubsubMessage::BeaconBlockAndBlobsSidecars(Arc::new(SignedBeaconBlockAndBlobsSidecar { PubsubMessage::BeaconBlockAndBlobsSidecars(Arc::new(SignedBeaconBlockAndBlobsSidecar {
@ -44,6 +46,7 @@ pub async fn publish_block<T: BeaconChainTypes>(
} else { } else {
PubsubMessage::BeaconBlock(block.clone()) PubsubMessage::BeaconBlock(block.clone())
}; };
*/
crate::publish_pubsub_message(network_tx, message)?; crate::publish_pubsub_message(network_tx, message)?;
// Determine the delay after the start of the slot, register it with metrics. // Determine the delay after the start of the slot, register it with metrics.

View File

@ -957,7 +957,7 @@ mod tests {
OutboundRequest::BlobsByRange(blbrange) => { OutboundRequest::BlobsByRange(blbrange) => {
assert_eq!(decoded, InboundRequest::BlobsByRange(blbrange)) assert_eq!(decoded, InboundRequest::BlobsByRange(blbrange))
} }
OutboundRequest::BlobsByRoot(blbroot) => { OutboundRequest::BlobsByRoot(bbroot) => {
assert_eq!(decoded, InboundRequest::BlobsByRoot(bbroot)) assert_eq!(decoded, InboundRequest::BlobsByRoot(bbroot))
} }
OutboundRequest::Ping(ping) => { OutboundRequest::Ping(ping) => {

View File

@ -1334,6 +1334,11 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::UnknownBlockAggregate { .. } => { Work::UnknownBlockAggregate { .. } => {
unknown_block_aggregate_queue.push(work) unknown_block_aggregate_queue.push(work)
} }
Work::BlobsByRootsRequest {
peer_id,
request_id,
request,
} => todo!(),
} }
} }
} }

View File

@ -842,7 +842,8 @@ impl<T: BeaconChainTypes> Worker<T> {
"gossip_block_low", "gossip_block_low",
); );
return None; return None;
} }
Err(blob_errors) => unimplemented!("handle")
}; };
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);

View File

@ -499,6 +499,7 @@ impl<T: BeaconChainTypes> Worker<T> {
//FIXME(sean) create the blobs iter //FIXME(sean) create the blobs iter
/*
let forwards_blob_root_iter = match self let forwards_blob_root_iter = match self
.chain .chain
.forwards_iter_block_roots(Slot::from(req.start_slot)) .forwards_iter_block_roots(Slot::from(req.start_slot))
@ -511,12 +512,13 @@ impl<T: BeaconChainTypes> Worker<T> {
}, },
)) => { )) => {
debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot);
return self.send_error_response( // return self.send_error_response(
peer_id, // peer_id,
RPCResponseErrorCode::ResourceUnavailable, // RPCResponseErrorCode::ResourceUnavailable,
"Backfilling".into(), // "Backfilling".into(),
request_id, // request_id,
); // );
todo!("stuff")
} }
Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e),
}; };
@ -546,7 +548,9 @@ impl<T: BeaconChainTypes> Worker<T> {
// remove all skip slots // remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>(); let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
*/
// Fetching blocks is async because it may have to hit the execution layer for payloads. // Fetching blocks is async because it may have to hit the execution layer for payloads.
/*
executor.spawn( executor.spawn(
async move { async move {
let mut blocks_sent = 0; let mut blocks_sent = 0;
@ -623,5 +627,7 @@ impl<T: BeaconChainTypes> Worker<T> {
}, },
"load_blocks_by_range_blocks", "load_blocks_by_range_blocks",
); );
*/
unimplemented!("")
} }
} }

View File

@ -210,6 +210,7 @@ impl<T: BeaconChainTypes> Processor<T> {
unreachable!("Block lookups do not request BBRange requests") unreachable!("Block lookups do not request BBRange requests")
} }
id @ (SyncId::BackFillSync { .. } | SyncId::RangeSync { .. }) => id, id @ (SyncId::BackFillSync { .. } | SyncId::RangeSync { .. }) => id,
SyncId::RangeBlockBlob { id } => unimplemented!("do it"),
}, },
RequestId::Router => unreachable!("All BBRange requests belong to sync"), RequestId::Router => unreachable!("All BBRange requests belong to sync"),
}; };
@ -268,6 +269,8 @@ impl<T: BeaconChainTypes> Processor<T> {
SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => { SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => {
unreachable!("Batch syncing do not request BBRoot requests") unreachable!("Batch syncing do not request BBRoot requests")
} }
SyncId::RangeBlockBlob { id } => unimplemented!("do it"),
}, },
RequestId::Router => unreachable!("All BBRoot requests belong to sync"), RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
}; };
@ -298,6 +301,8 @@ impl<T: BeaconChainTypes> Processor<T> {
SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => { SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => {
unreachable!("Batch syncing do not request BBRoot requests") unreachable!("Batch syncing do not request BBRoot requests")
} }
SyncId::RangeBlockBlob { id } => unimplemented!("do it"),
}, },
RequestId::Router => unreachable!("All BBRoot requests belong to sync"), RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
}; };

View File

@ -39,7 +39,7 @@ use super::network_context::SyncNetworkContext;
use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
use crate::service::{NetworkMessage, RequestId}; use crate::service::NetworkMessage;
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
use futures::StreamExt; use futures::StreamExt;
@ -68,6 +68,17 @@ pub const SLOT_IMPORT_TOLERANCE: usize = 32;
pub type Id = u32; pub type Id = u32;
#[derive(Debug)]
pub struct SeansBlob {}
#[derive(Debug)]
pub struct SeansBlock {}
#[derive(Debug)]
pub struct SeansBlockBlob {
blob: SeansBlob,
block: SeansBlock,
}
/// Id of rpc requests sent by sync to the network. /// Id of rpc requests sent by sync to the network.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RequestId { pub enum RequestId {
@ -312,8 +323,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
} }
RequestId::RangeBlockBlob { id } => { RequestId::RangeBlockBlob { id } => {
if let Some((chain_id, batch_id)) = self.network.fail_block_bob_request(request_id) if let Some((chain_id, batch_id)) = self.network.fail_block_bob_request(id) {
{
self.range_sync.inject_error( self.range_sync.inject_error(
&mut self.network, &mut self.network,
peer_id, peer_id,
@ -617,6 +627,18 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.block_lookups .block_lookups
.parent_chain_processed(chain_hash, result, &mut self.network), .parent_chain_processed(chain_hash, result, &mut self.network),
}, },
SyncMessage::RpcBlob {
request_id,
peer_id,
blob_sidecar,
seen_timestamp,
} => todo!(),
SyncMessage::RpcBlockAndBlob {
request_id,
peer_id,
block_and_blobs,
seen_timestamp,
} => todo!(),
} }
} }
@ -736,7 +758,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
RequestId::RangeBlockBlob { id } => { RequestId::RangeBlockBlob { id } => {
// do stuff // do stuff
self.network.block_blob_block_response(id, block); // self.network.block_blob_block_response(id, block);
} }
} }
} }
@ -749,10 +771,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration, seen_timestamp: Duration,
) { ) {
let RequestId::RangeBlockBlob { id } = request_id else { let RequestId::RangeBlockBlob { id } = request_id else {
return error!("bad stuff"); panic!("Wrong things going on ");
}; };
// get the paired block blob from the network context and send it to range // get the paired block blob from the network context and send it to range
self.network.block_blob_blob_response(request_id, blob)
} }
} }

View File

@ -178,10 +178,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: blocks_request, request: blocks_request,
request_id, request_id,
}) })
.and_then(|| { .and_then(|_| {
self.send_network_msg(NetworkMessage::SendRequest { self.send_network_msg(NetworkMessage::SendRequest {
peer_id, peer_id,
request: blocks_request, request: blobs_request,
request_id, request_id,
}) })
})?; })?;
@ -247,55 +247,57 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: Id, request_id: Id,
block: Option<SeansBlock>, block: Option<SeansBlock>,
) -> Option<(ChainId, BatchId, Option<SeansBlockBlob>)> { ) -> Option<(ChainId, BatchId, Option<SeansBlockBlob>)> {
let (chain_id, batch_id, info) = self.block_blob_requests.get_mut(&request_id)?; unimplemented!()
let response = match block { // let (chain_id, batch_id, info) = self.block_blob_requests.get_mut(&request_id)?;
Some(block) => match info.accumulated_blobs.pop_front() { // match block {
Some(blob) => Some(SeansBlockBlob { block, blob }), // Some(block) => match info.accumulated_blobs.pop_front() {
None => { // Some(blob) => Some(SeansBlockBlob { block, blob }),
// accumulate the block // None => {
info.accumulated_blocks.push_back(block); // // accumulate the block
None // info.accumulated_blocks.push_back(block);
} // None
}, // }
None => { // },
info.is_blocks_rpc_finished = true; // None => {
// info.is_blocks_rpc_finished = true;
if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished { //
// this is the coupled stream termination // if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished {
Some((chain_id, batch_id, None)) // // this is the coupled stream termination
} else { // Some((chain_id, batch_id, None))
None // } else {
} // None
} // }
}; // }
// }
} }
pub fn block_blob_blob_response( pub fn block_blob_blob_response(
&mut self, &mut self,
request_id: Id, request_id: Id,
blob: Option<SeansBlob>, blob: Option<SeansBlob>,
) -> Option<(ChainId, Option<SeansBlockBlob>)> { ) -> Option<(ChainId, BatchId, Option<SeansBlockBlob>)> {
let (chain_id, info) = self.block_blob_requests.get_mut(&request_id)?; // let (batch_id, chain_id, info) = self.block_blob_requests.get_mut(&request_id)?;
let response = match blob { // match blob {
Some(blob) => match info.accumulated_blocks.pop_front() { // Some(blob) => match info.accumulated_blocks.pop_front() {
Some(block) => Some(SeansBlockBlob { block, blob }), // Some(block) => Some(SeansBlockBlob { block, blob }),
None => { // None => {
// accumulate the blob // // accumulate the blob
info.accumulated_blobs.push_back(blob); // info.accumulated_blobs.push_back(blob);
None // None
} // }
}, // },
None => { // None => {
info.is_blobs_rpc_finished = true; // info.is_blobs_rpc_finished = true;
//
if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished { // if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished {
// this is the coupled stream termination // // this is the coupled stream termination
Some((chain_id, batch_id, None)) // Some((chain_id, batch_id, None))
} else { // } else {
None // None
} // }
} // }
}; // }
unimplemented!("do it")
} }
/// Received a blocks by range response. /// Received a blocks by range response.