add context bytes to blob messages, fix rpc limits, sync past finalized checkpoint during finalized sync so we can advance our own finalization, fix stream termination bug in blobs by range
This commit is contained in:
parent
a67fa516c7
commit
ff772311fa
@ -439,6 +439,9 @@ fn context_bytes<T: EthSpec>(
|
||||
SignedBeaconBlock::Base { .. } => Some(fork_context.genesis_context_bytes()),
|
||||
};
|
||||
}
|
||||
if let RPCResponse::BlobsByRange(_) | RPCResponse::BlobsByRoot(_) = rpc_variant {
|
||||
return fork_context.to_context_bytes(ForkName::Eip4844);
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
@ -570,42 +573,38 @@ fn handle_v1_response<T: EthSpec>(
|
||||
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
|
||||
)))),
|
||||
Protocol::BlobsByRange => {
|
||||
Ok(Some(RPCResponse::BlobsByRange(Arc::new(
|
||||
BlobsSidecar::from_ssz_bytes(decoded_buffer)?,
|
||||
))))
|
||||
//FIXME(sean) do we need context bytes?
|
||||
// let fork_name = fork_name.take().ok_or_else(|| {
|
||||
// RPCError::ErrorResponse(
|
||||
// RPCResponseErrorCode::InvalidRequest,
|
||||
// format!("No context bytes provided for {} response", protocol),
|
||||
// )
|
||||
// })?;
|
||||
// match fork_name {
|
||||
// ForkName::Eip4844 => ,
|
||||
// _ => Err(RPCError::ErrorResponse(
|
||||
// RPCResponseErrorCode::InvalidRequest,
|
||||
// "Invalid forkname for blobsbyrange".to_string(),
|
||||
// )),
|
||||
// }
|
||||
let fork_name = fork_name.take().ok_or_else(|| {
|
||||
RPCError::ErrorResponse(
|
||||
RPCResponseErrorCode::InvalidRequest,
|
||||
format!("No context bytes provided for {} response", protocol),
|
||||
)
|
||||
})?;
|
||||
match fork_name {
|
||||
ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRange(Arc::new(
|
||||
BlobsSidecar::from_ssz_bytes(decoded_buffer)?,
|
||||
)))),
|
||||
_ => Err(RPCError::ErrorResponse(
|
||||
RPCResponseErrorCode::InvalidRequest,
|
||||
"Invalid forkname for blobsbyrange".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
Protocol::BlobsByRoot => {
|
||||
Ok(Some(RPCResponse::BlobsByRoot(Arc::new(
|
||||
SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?,
|
||||
))))
|
||||
//FIXME(sean) do we need context bytes?
|
||||
// let fork_name = fork_name.take().ok_or_else(|| {
|
||||
// RPCError::ErrorResponse(
|
||||
// RPCResponseErrorCode::InvalidRequest,
|
||||
// format!("No context bytes provided for {} response", protocol),
|
||||
// )
|
||||
// })?;
|
||||
// match fork_name {
|
||||
// ForkName::Eip4844 =>
|
||||
// _ => Err(RPCError::ErrorResponse(
|
||||
// RPCResponseErrorCode::InvalidRequest,
|
||||
// "Invalid forkname for blobsbyroot".to_string(),
|
||||
// )),
|
||||
// }
|
||||
let fork_name = fork_name.take().ok_or_else(|| {
|
||||
RPCError::ErrorResponse(
|
||||
RPCResponseErrorCode::InvalidRequest,
|
||||
format!("No context bytes provided for {} response", protocol),
|
||||
)
|
||||
})?;
|
||||
match fork_name {
|
||||
ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRoot(Arc::new(
|
||||
SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?,
|
||||
)))),
|
||||
_ => Err(RPCError::ErrorResponse(
|
||||
RPCResponseErrorCode::InvalidRequest,
|
||||
"Invalid forkname for blobsbyroot".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
Protocol::Ping => Ok(Some(RPCResponse::Pong(Ping {
|
||||
data: u64::from_ssz_bytes(decoded_buffer)?,
|
||||
|
@ -23,7 +23,7 @@ use tokio_util::{
|
||||
use types::BlobsSidecar;
|
||||
use types::{
|
||||
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Blob, EmptyBlock, EthSpec,
|
||||
ForkContext, ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock
|
||||
ForkContext, ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock,
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
@ -364,16 +364,10 @@ impl ProtocolId {
|
||||
Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response
|
||||
Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()),
|
||||
Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()),
|
||||
|
||||
Protocol::BlobsByRange => RpcLimits::new(
|
||||
*BLOBS_SIDECAR_MIN,
|
||||
*BLOBS_SIDECAR_MAX,
|
||||
),
|
||||
Protocol::BlobsByRoot => RpcLimits::new(
|
||||
*SIGNED_BLOCK_AND_BLOBS_MIN,
|
||||
*SIGNED_BLOCK_AND_BLOBS_MAX,
|
||||
),
|
||||
|
||||
Protocol::BlobsByRange => RpcLimits::new(*BLOBS_SIDECAR_MIN, *BLOBS_SIDECAR_MAX),
|
||||
Protocol::BlobsByRoot => {
|
||||
RpcLimits::new(*SIGNED_BLOCK_AND_BLOBS_MIN, *SIGNED_BLOCK_AND_BLOBS_MAX)
|
||||
}
|
||||
Protocol::Ping => RpcLimits::new(
|
||||
<Ping as Encode>::ssz_fixed_len(),
|
||||
<Ping as Encode>::ssz_fixed_len(),
|
||||
@ -392,13 +386,16 @@ impl ProtocolId {
|
||||
/// Returns `true` if the given `ProtocolId` should expect `context_bytes` in the
|
||||
/// beginning of the stream, else returns `false`.
|
||||
pub fn has_context_bytes(&self) -> bool {
|
||||
if self.version == Version::V2 {
|
||||
match self.message_name {
|
||||
match self.version {
|
||||
Version::V2 => match self.message_name {
|
||||
Protocol::BlocksByRange | Protocol::BlocksByRoot => return true,
|
||||
_ => return false,
|
||||
}
|
||||
},
|
||||
Version::V1 => match self.message_name {
|
||||
Protocol::BlobsByRange | Protocol::BlobsByRoot => return true,
|
||||
_ => return false,
|
||||
},
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -689,7 +689,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
self.log,
|
||||
"BlobsByRange Response processed";
|
||||
"peer" => %peer_id,
|
||||
"msg" => "Failed to return all requested blocks",
|
||||
"msg" => "Failed to return all requested blobs",
|
||||
"start_slot" => req.start_slot,
|
||||
"current_slot" => current_slot,
|
||||
"requested" => req.count,
|
||||
|
@ -319,12 +319,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
let maybe_block_wrapped = info.pop_response().map(|block_sidecar_pair| {
|
||||
BlockWrapper::BlockAndBlob { block_sidecar_pair }
|
||||
});
|
||||
|
||||
if stream_terminator && !info.is_finished() {
|
||||
return None;
|
||||
}
|
||||
if !stream_terminator && maybe_block_wrapped.is_none() {
|
||||
return None;
|
||||
}
|
||||
|
||||
if info.is_finished() {
|
||||
entry.remove();
|
||||
}
|
||||
if !stream_terminator && maybe_block_wrapped.is_none() {
|
||||
return None
|
||||
}
|
||||
|
||||
Some((chain_id, batch_id, maybe_block_wrapped))
|
||||
}
|
||||
Entry::Vacant(_) => None,
|
||||
@ -365,12 +371,19 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
let maybe_block = info
|
||||
.pop_response()
|
||||
.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair });
|
||||
|
||||
if stream_terminator && !info.is_finished() {
|
||||
return None;
|
||||
}
|
||||
|
||||
if !stream_terminator && maybe_block.is_none() {
|
||||
return None;
|
||||
}
|
||||
|
||||
if info.is_finished() {
|
||||
entry.remove();
|
||||
}
|
||||
if !stream_terminator && maybe_block.is_none() {
|
||||
return None
|
||||
}
|
||||
|
||||
Some((chain_id, batch_id, maybe_block))
|
||||
}
|
||||
Entry::Vacant(_) => None,
|
||||
|
@ -137,10 +137,16 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
|
||||
let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);
|
||||
|
||||
let target_slot = if is_finalized_segment {
|
||||
target_head_slot + (2 * T::EthSpec::slots_per_epoch()) + 1
|
||||
} else {
|
||||
target_head_slot
|
||||
};
|
||||
|
||||
SyncingChain {
|
||||
id,
|
||||
start_epoch,
|
||||
target_head_slot,
|
||||
target_head_slot: target_slot,
|
||||
target_head_root,
|
||||
batches: BTreeMap::new(),
|
||||
peers,
|
||||
|
@ -12,8 +12,8 @@
|
||||
"berlinBlock": 0,
|
||||
"londonBlock": 0,
|
||||
"mergeNetsplitBlock": 0,
|
||||
"shanghaiTime": 1671582851,
|
||||
"shardingForkTime": 1671582947,
|
||||
"shanghaiTime": 0,
|
||||
"shardingForkTime": 0,
|
||||
"terminalTotalDifficulty": 0
|
||||
},
|
||||
"alloc": {
|
||||
|
Loading…
Reference in New Issue
Block a user