Send success code for duplicate blocks on HTTP (#4655)
## Issue Addressed Closes #4473 (take 3) ## Proposed Changes - Send a 202 status code by default for duplicate blocks, instead of 400. This conveys to the caller that the block was published, but makes no guarantees about its validity. Block relays can count this as a success or a failure as they wish. - For users wanting finer-grained control over which status is returned for duplicates, a flag `--http-duplicate-block-status` can be used to adjust the behaviour. A 400 status can be supplied to restore the old (spec-compliant) behaviour, or a 200 status can be used to silence VCs that warn loudly for non-200 codes (e.g. Lighthouse prior to v4.4.0). - Update the Lighthouse VC to gracefully handle success codes other than 200. The info message isn't the nicest thing to read, but it covers all bases and isn't a nasty `ERRO`/`CRIT` that will wake anyone up. ## Additional Info I'm planning to raise a PR to `beacon-APIs` to specify that clients may return 202 for duplicate blocks. Really it would be nice to use some 2xx code that _isn't_ the same as the code for "published but invalid". I think unfortunately there aren't any suitable codes, and maybe the best fit is `409 CONFLICT`. Given that we need to fix this promptly for our release, I think using the 202 code temporarily with configuration strikes a nice compromise.
This commit is contained in:
parent
c258270d6a
commit
8e95b69a1a
@ -139,6 +139,8 @@ pub struct Config {
|
|||||||
pub data_dir: PathBuf,
|
pub data_dir: PathBuf,
|
||||||
pub sse_capacity_multiplier: usize,
|
pub sse_capacity_multiplier: usize,
|
||||||
pub enable_beacon_processor: bool,
|
pub enable_beacon_processor: bool,
|
||||||
|
#[serde(with = "eth2::types::serde_status_code")]
|
||||||
|
pub duplicate_block_status_code: StatusCode,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
@ -154,6 +156,7 @@ impl Default for Config {
|
|||||||
data_dir: PathBuf::from(DEFAULT_ROOT_DIR),
|
data_dir: PathBuf::from(DEFAULT_ROOT_DIR),
|
||||||
sse_capacity_multiplier: 1,
|
sse_capacity_multiplier: 1,
|
||||||
enable_beacon_processor: true,
|
enable_beacon_processor: true,
|
||||||
|
duplicate_block_status_code: StatusCode::ACCEPTED,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -510,6 +513,8 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
let task_spawner_filter =
|
let task_spawner_filter =
|
||||||
warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone()));
|
warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone()));
|
||||||
|
|
||||||
|
let duplicate_block_status_code = ctx.config.duplicate_block_status_code;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
*
|
*
|
||||||
* Start of HTTP method definitions.
|
* Start of HTTP method definitions.
|
||||||
@ -1284,7 +1289,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(network_tx_filter.clone())
|
.and(network_tx_filter.clone())
|
||||||
.and(log_filter.clone())
|
.and(log_filter.clone())
|
||||||
.then(
|
.then(
|
||||||
|block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
move |block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||||
task_spawner: TaskSpawner<T::EthSpec>,
|
task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
chain: Arc<BeaconChain<T>>,
|
chain: Arc<BeaconChain<T>>,
|
||||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||||
@ -1297,9 +1302,9 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
&network_tx,
|
&network_tx,
|
||||||
log,
|
log,
|
||||||
BroadcastValidation::default(),
|
BroadcastValidation::default(),
|
||||||
|
duplicate_block_status_code,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map(|()| warp::reply().into_response())
|
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@ -1314,7 +1319,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(network_tx_filter.clone())
|
.and(network_tx_filter.clone())
|
||||||
.and(log_filter.clone())
|
.and(log_filter.clone())
|
||||||
.then(
|
.then(
|
||||||
|block_bytes: Bytes,
|
move |block_bytes: Bytes,
|
||||||
task_spawner: TaskSpawner<T::EthSpec>,
|
task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
chain: Arc<BeaconChain<T>>,
|
chain: Arc<BeaconChain<T>>,
|
||||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||||
@ -1334,9 +1339,9 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
&network_tx,
|
&network_tx,
|
||||||
log,
|
log,
|
||||||
BroadcastValidation::default(),
|
BroadcastValidation::default(),
|
||||||
|
duplicate_block_status_code,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map(|()| warp::reply().into_response())
|
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@ -1352,7 +1357,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(network_tx_filter.clone())
|
.and(network_tx_filter.clone())
|
||||||
.and(log_filter.clone())
|
.and(log_filter.clone())
|
||||||
.then(
|
.then(
|
||||||
|validation_level: api_types::BroadcastValidationQuery,
|
move |validation_level: api_types::BroadcastValidationQuery,
|
||||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||||
task_spawner: TaskSpawner<T::EthSpec>,
|
task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
chain: Arc<BeaconChain<T>>,
|
chain: Arc<BeaconChain<T>>,
|
||||||
@ -1366,9 +1371,9 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
&network_tx,
|
&network_tx,
|
||||||
log,
|
log,
|
||||||
validation_level.broadcast_validation,
|
validation_level.broadcast_validation,
|
||||||
|
duplicate_block_status_code,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map(|()| warp::reply().into_response())
|
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@ -1384,7 +1389,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(network_tx_filter.clone())
|
.and(network_tx_filter.clone())
|
||||||
.and(log_filter.clone())
|
.and(log_filter.clone())
|
||||||
.then(
|
.then(
|
||||||
|validation_level: api_types::BroadcastValidationQuery,
|
move |validation_level: api_types::BroadcastValidationQuery,
|
||||||
block_bytes: Bytes,
|
block_bytes: Bytes,
|
||||||
task_spawner: TaskSpawner<T::EthSpec>,
|
task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
chain: Arc<BeaconChain<T>>,
|
chain: Arc<BeaconChain<T>>,
|
||||||
@ -1405,9 +1410,9 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
&network_tx,
|
&network_tx,
|
||||||
log,
|
log,
|
||||||
validation_level.broadcast_validation,
|
validation_level.broadcast_validation,
|
||||||
|
duplicate_block_status_code,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map(|()| warp::reply().into_response())
|
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@ -1427,7 +1432,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(network_tx_filter.clone())
|
.and(network_tx_filter.clone())
|
||||||
.and(log_filter.clone())
|
.and(log_filter.clone())
|
||||||
.then(
|
.then(
|
||||||
|block: SignedBeaconBlock<T::EthSpec, BlindedPayload<_>>,
|
move |block: SignedBlindedBeaconBlock<T::EthSpec>,
|
||||||
task_spawner: TaskSpawner<T::EthSpec>,
|
task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
chain: Arc<BeaconChain<T>>,
|
chain: Arc<BeaconChain<T>>,
|
||||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||||
@ -1439,9 +1444,9 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
&network_tx,
|
&network_tx,
|
||||||
log,
|
log,
|
||||||
BroadcastValidation::default(),
|
BroadcastValidation::default(),
|
||||||
|
duplicate_block_status_code,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map(|()| warp::reply().into_response())
|
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@ -1457,13 +1462,13 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(network_tx_filter.clone())
|
.and(network_tx_filter.clone())
|
||||||
.and(log_filter.clone())
|
.and(log_filter.clone())
|
||||||
.then(
|
.then(
|
||||||
|block_bytes: Bytes,
|
move |block_bytes: Bytes,
|
||||||
task_spawner: TaskSpawner<T::EthSpec>,
|
task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
chain: Arc<BeaconChain<T>>,
|
chain: Arc<BeaconChain<T>>,
|
||||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||||
log: Logger| {
|
log: Logger| {
|
||||||
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
|
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
|
||||||
let block = SignedBeaconBlock::<T::EthSpec, BlindedPayload<_>>::from_ssz_bytes(
|
let block = SignedBlindedBeaconBlock::<T::EthSpec>::from_ssz_bytes(
|
||||||
&block_bytes,
|
&block_bytes,
|
||||||
&chain.spec,
|
&chain.spec,
|
||||||
)
|
)
|
||||||
@ -1476,9 +1481,9 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
&network_tx,
|
&network_tx,
|
||||||
log,
|
log,
|
||||||
BroadcastValidation::default(),
|
BroadcastValidation::default(),
|
||||||
|
duplicate_block_status_code,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map(|()| warp::reply().into_response())
|
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@ -1494,85 +1499,61 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(network_tx_filter.clone())
|
.and(network_tx_filter.clone())
|
||||||
.and(log_filter.clone())
|
.and(log_filter.clone())
|
||||||
.then(
|
.then(
|
||||||
|validation_level: api_types::BroadcastValidationQuery,
|
move |validation_level: api_types::BroadcastValidationQuery,
|
||||||
block: SignedBeaconBlock<T::EthSpec, BlindedPayload<_>>,
|
block: SignedBlindedBeaconBlock<T::EthSpec>,
|
||||||
task_spawner: TaskSpawner<T::EthSpec>,
|
task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
chain: Arc<BeaconChain<T>>,
|
chain: Arc<BeaconChain<T>>,
|
||||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||||
log: Logger| {
|
log: Logger| {
|
||||||
task_spawner.spawn_async(Priority::P0, async move {
|
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
|
||||||
match publish_blocks::publish_blinded_block(
|
publish_blocks::publish_blinded_block(
|
||||||
block,
|
block,
|
||||||
chain,
|
chain,
|
||||||
&network_tx,
|
&network_tx,
|
||||||
log,
|
log,
|
||||||
validation_level.broadcast_validation,
|
validation_level.broadcast_validation,
|
||||||
|
duplicate_block_status_code,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
|
||||||
Ok(()) => warp::reply().into_response(),
|
|
||||||
Err(e) => match warp_utils::reject::handle_rejection(e).await {
|
|
||||||
Ok(reply) => reply.into_response(),
|
|
||||||
Err(_) => warp::reply::with_status(
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
eth2::StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
)
|
|
||||||
.into_response(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
let post_beacon_blinded_blocks_v2_ssz =
|
let post_beacon_blinded_blocks_v2_ssz = eth_v2
|
||||||
eth_v2
|
|
||||||
.and(warp::path("beacon"))
|
.and(warp::path("beacon"))
|
||||||
.and(warp::path("blinded_blocks"))
|
.and(warp::path("blinded_blocks"))
|
||||||
.and(warp::query::<api_types::BroadcastValidationQuery>())
|
.and(warp::query::<api_types::BroadcastValidationQuery>())
|
||||||
.and(warp::path::end())
|
.and(warp::path::end())
|
||||||
.and(warp::body::bytes())
|
.and(warp::body::bytes())
|
||||||
|
.and(task_spawner_filter.clone())
|
||||||
.and(chain_filter.clone())
|
.and(chain_filter.clone())
|
||||||
.and(network_tx_filter.clone())
|
.and(network_tx_filter.clone())
|
||||||
.and(log_filter.clone())
|
.and(log_filter.clone())
|
||||||
.then(
|
.then(
|
||||||
|validation_level: api_types::BroadcastValidationQuery,
|
move |validation_level: api_types::BroadcastValidationQuery,
|
||||||
block_bytes: Bytes,
|
block_bytes: Bytes,
|
||||||
|
task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
chain: Arc<BeaconChain<T>>,
|
chain: Arc<BeaconChain<T>>,
|
||||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||||
log: Logger| async move {
|
log: Logger| {
|
||||||
let block =
|
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
|
||||||
match SignedBeaconBlock::<T::EthSpec, BlindedPayload<_>>::from_ssz_bytes(
|
let block = SignedBlindedBeaconBlock::<T::EthSpec>::from_ssz_bytes(
|
||||||
&block_bytes,
|
&block_bytes,
|
||||||
&chain.spec,
|
&chain.spec,
|
||||||
) {
|
|
||||||
Ok(data) => data,
|
|
||||||
Err(_) => {
|
|
||||||
return warp::reply::with_status(
|
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
eth2::StatusCode::BAD_REQUEST,
|
|
||||||
)
|
)
|
||||||
.into_response();
|
.map_err(|e| {
|
||||||
}
|
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
|
||||||
};
|
})?;
|
||||||
match publish_blocks::publish_blinded_block(
|
publish_blocks::publish_blinded_block(
|
||||||
block,
|
block,
|
||||||
chain,
|
chain,
|
||||||
&network_tx,
|
&network_tx,
|
||||||
log,
|
log,
|
||||||
validation_level.broadcast_validation,
|
validation_level.broadcast_validation,
|
||||||
|
duplicate_block_status_code,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
})
|
||||||
Ok(()) => warp::reply().into_response(),
|
|
||||||
Err(e) => match warp_utils::reject::handle_rejection(e).await {
|
|
||||||
Ok(reply) => reply.into_response(),
|
|
||||||
Err(_) => warp::reply::with_status(
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
eth2::StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
)
|
|
||||||
.into_response(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@ use beacon_chain::{
|
|||||||
BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, IntoGossipVerifiedBlock,
|
BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, IntoGossipVerifiedBlock,
|
||||||
NotifyExecutionLayer,
|
NotifyExecutionLayer,
|
||||||
};
|
};
|
||||||
use eth2::types::BroadcastValidation;
|
use eth2::types::{BroadcastValidation, ErrorMessage};
|
||||||
use execution_layer::ProvenancedPayload;
|
use execution_layer::ProvenancedPayload;
|
||||||
use lighthouse_network::PubsubMessage;
|
use lighthouse_network::PubsubMessage;
|
||||||
use network::NetworkMessage;
|
use network::NetworkMessage;
|
||||||
@ -19,7 +19,8 @@ use types::{
|
|||||||
AbstractExecPayload, BeaconBlockRef, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash,
|
AbstractExecPayload, BeaconBlockRef, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash,
|
||||||
FullPayload, Hash256, SignedBeaconBlock,
|
FullPayload, Hash256, SignedBeaconBlock,
|
||||||
};
|
};
|
||||||
use warp::Rejection;
|
use warp::http::StatusCode;
|
||||||
|
use warp::{reply::Response, Rejection, Reply};
|
||||||
|
|
||||||
pub enum ProvenancedBlock<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>> {
|
pub enum ProvenancedBlock<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>> {
|
||||||
/// The payload was built using a local EE.
|
/// The payload was built using a local EE.
|
||||||
@ -47,7 +48,8 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
|
|||||||
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
|
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||||
log: Logger,
|
log: Logger,
|
||||||
validation_level: BroadcastValidation,
|
validation_level: BroadcastValidation,
|
||||||
) -> Result<(), Rejection> {
|
duplicate_status_code: StatusCode,
|
||||||
|
) -> Result<Response, Rejection> {
|
||||||
let seen_timestamp = timestamp_now();
|
let seen_timestamp = timestamp_now();
|
||||||
let (block, is_locally_built_block) = match provenanced_block {
|
let (block, is_locally_built_block) = match provenanced_block {
|
||||||
ProvenancedBlock::Local(block, _) => (block, true),
|
ProvenancedBlock::Local(block, _) => (block, true),
|
||||||
@ -75,10 +77,30 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
|
|||||||
};
|
};
|
||||||
|
|
||||||
/* if we can form a `GossipVerifiedBlock`, we've passed our basic gossip checks */
|
/* if we can form a `GossipVerifiedBlock`, we've passed our basic gossip checks */
|
||||||
let gossip_verified_block = block.into_gossip_verified_block(&chain).map_err(|e| {
|
let gossip_verified_block = match block.into_gossip_verified_block(&chain) {
|
||||||
warn!(log, "Not publishing block, not gossip verified"; "slot" => beacon_block.slot(), "error" => ?e);
|
Ok(b) => b,
|
||||||
warp_utils::reject::custom_bad_request(e.to_string())
|
Err(BlockError::BlockIsAlreadyKnown) => {
|
||||||
})?;
|
// Allow the status code for duplicate blocks to be overridden based on config.
|
||||||
|
return Ok(warp::reply::with_status(
|
||||||
|
warp::reply::json(&ErrorMessage {
|
||||||
|
code: duplicate_status_code.as_u16(),
|
||||||
|
message: "duplicate block".to_string(),
|
||||||
|
stacktraces: vec![],
|
||||||
|
}),
|
||||||
|
duplicate_status_code,
|
||||||
|
)
|
||||||
|
.into_response());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
log,
|
||||||
|
"Not publishing block - not gossip verified";
|
||||||
|
"slot" => beacon_block.slot(),
|
||||||
|
"error" => ?e
|
||||||
|
);
|
||||||
|
return Err(warp_utils::reject::custom_bad_request(e.to_string()));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let block_root = block_root.unwrap_or(gossip_verified_block.block_root);
|
let block_root = block_root.unwrap_or(gossip_verified_block.block_root);
|
||||||
|
|
||||||
@ -167,8 +189,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
|
|||||||
&log,
|
&log,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
Ok(warp::reply().into_response())
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
Err(BlockError::BeaconChainError(BeaconChainError::UnableToPublish)) => {
|
Err(BlockError::BeaconChainError(BeaconChainError::UnableToPublish)) => {
|
||||||
Err(warp_utils::reject::custom_server_error(
|
Err(warp_utils::reject::custom_server_error(
|
||||||
@ -178,10 +199,6 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
|
|||||||
Err(BlockError::Slashable) => Err(warp_utils::reject::custom_bad_request(
|
Err(BlockError::Slashable) => Err(warp_utils::reject::custom_bad_request(
|
||||||
"proposal for this slot and proposer has already been seen".to_string(),
|
"proposal for this slot and proposer has already been seen".to_string(),
|
||||||
)),
|
)),
|
||||||
Err(BlockError::BlockIsAlreadyKnown) => {
|
|
||||||
info!(log, "Block from HTTP API already known"; "block" => ?block_root);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
if let BroadcastValidation::Gossip = validation_level {
|
if let BroadcastValidation::Gossip = validation_level {
|
||||||
Err(warp_utils::reject::broadcast_without_import(format!("{e}")))
|
Err(warp_utils::reject::broadcast_without_import(format!("{e}")))
|
||||||
@ -208,7 +225,8 @@ pub async fn publish_blinded_block<T: BeaconChainTypes>(
|
|||||||
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
|
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
|
||||||
log: Logger,
|
log: Logger,
|
||||||
validation_level: BroadcastValidation,
|
validation_level: BroadcastValidation,
|
||||||
) -> Result<(), Rejection> {
|
duplicate_status_code: StatusCode,
|
||||||
|
) -> Result<Response, Rejection> {
|
||||||
let block_root = block.canonical_root();
|
let block_root = block.canonical_root();
|
||||||
let full_block: ProvenancedBlock<T, Arc<SignedBeaconBlock<T::EthSpec>>> =
|
let full_block: ProvenancedBlock<T, Arc<SignedBeaconBlock<T::EthSpec>>> =
|
||||||
reconstruct_block(chain.clone(), block_root, block, log.clone()).await?;
|
reconstruct_block(chain.clone(), block_root, block, log.clone()).await?;
|
||||||
@ -219,6 +237,7 @@ pub async fn publish_blinded_block<T: BeaconChainTypes>(
|
|||||||
network_tx,
|
network_tx,
|
||||||
log,
|
log,
|
||||||
validation_level,
|
validation_level,
|
||||||
|
duplicate_status_code,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -159,46 +159,6 @@ impl<E: EthSpec> TaskSpawner<E> {
|
|||||||
.and_then(|x| x)
|
.and_then(|x| x)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Executes an async task which always returns a `Response`.
|
|
||||||
pub async fn spawn_async(
|
|
||||||
self,
|
|
||||||
priority: Priority,
|
|
||||||
func: impl Future<Output = Response> + Send + Sync + 'static,
|
|
||||||
) -> Response {
|
|
||||||
if let Some(beacon_processor_send) = &self.beacon_processor_send {
|
|
||||||
// Create a wrapper future that will execute `func` and send the
|
|
||||||
// result to a channel held by this thread.
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
let process_fn = async move {
|
|
||||||
// Await the future, collect the return value.
|
|
||||||
let func_result = func.await;
|
|
||||||
// Send the result down the channel. Ignore any failures; the
|
|
||||||
// send can only fail if the receiver is dropped.
|
|
||||||
let _ = tx.send(func_result);
|
|
||||||
};
|
|
||||||
|
|
||||||
// Send the function to the beacon processor for execution at some arbitrary time.
|
|
||||||
let result = send_to_beacon_processor(
|
|
||||||
beacon_processor_send,
|
|
||||||
priority,
|
|
||||||
BlockingOrAsync::Async(Box::pin(process_fn)),
|
|
||||||
rx,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
convert_rejection(result).await
|
|
||||||
} else {
|
|
||||||
// There is no beacon processor so spawn a task directly on the
|
|
||||||
// tokio executor.
|
|
||||||
tokio::task::spawn(func).await.unwrap_or_else(|e| {
|
|
||||||
warp::reply::with_status(
|
|
||||||
warp::reply::json(&format!("Tokio did not execute task: {e:?}")),
|
|
||||||
eth2::StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
)
|
|
||||||
.into_response()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a task to the beacon processor and await execution.
|
/// Send a task to the beacon processor and await execution.
|
||||||
|
@ -23,7 +23,7 @@ use network::{NetworkReceivers, NetworkSenders};
|
|||||||
use sensitive_url::SensitiveUrl;
|
use sensitive_url::SensitiveUrl;
|
||||||
use slog::Logger;
|
use slog::Logger;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use store::MemoryStore;
|
use store::MemoryStore;
|
||||||
@ -220,15 +220,9 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
|
|||||||
let ctx = Arc::new(Context {
|
let ctx = Arc::new(Context {
|
||||||
config: Config {
|
config: Config {
|
||||||
enabled: true,
|
enabled: true,
|
||||||
listen_addr: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
|
||||||
listen_port: port,
|
listen_port: port,
|
||||||
allow_origin: None,
|
|
||||||
tls_config: None,
|
|
||||||
allow_sync_stalled: false,
|
|
||||||
data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR),
|
data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR),
|
||||||
spec_fork_name: None,
|
..Config::default()
|
||||||
sse_capacity_multiplier: 1,
|
|
||||||
enable_beacon_processor: true,
|
|
||||||
},
|
},
|
||||||
chain: Some(chain),
|
chain: Some(chain),
|
||||||
network_senders: Some(network_senders),
|
network_senders: Some(network_senders),
|
||||||
|
@ -364,13 +364,14 @@ pub async fn consensus_partial_pass_only_consensus() {
|
|||||||
/* submit `block_b` which should induce equivocation */
|
/* submit `block_b` which should induce equivocation */
|
||||||
let channel = tokio::sync::mpsc::unbounded_channel();
|
let channel = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
|
||||||
let publication_result: Result<(), Rejection> = publish_block(
|
let publication_result = publish_block(
|
||||||
None,
|
None,
|
||||||
ProvenancedBlock::local(gossip_block_b.unwrap()),
|
ProvenancedBlock::local(gossip_block_b.unwrap()),
|
||||||
tester.harness.chain.clone(),
|
tester.harness.chain.clone(),
|
||||||
&channel.0,
|
&channel.0,
|
||||||
test_logger,
|
test_logger,
|
||||||
validation_level.unwrap(),
|
validation_level.unwrap(),
|
||||||
|
StatusCode::ACCEPTED,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@ -641,13 +642,14 @@ pub async fn equivocation_consensus_late_equivocation() {
|
|||||||
|
|
||||||
let channel = tokio::sync::mpsc::unbounded_channel();
|
let channel = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
|
||||||
let publication_result: Result<(), Rejection> = publish_block(
|
let publication_result = publish_block(
|
||||||
None,
|
None,
|
||||||
ProvenancedBlock::local(gossip_block_b.unwrap()),
|
ProvenancedBlock::local(gossip_block_b.unwrap()),
|
||||||
tester.harness.chain,
|
tester.harness.chain,
|
||||||
&channel.0,
|
&channel.0,
|
||||||
test_logger,
|
test_logger,
|
||||||
validation_level.unwrap(),
|
validation_level.unwrap(),
|
||||||
|
StatusCode::ACCEPTED,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@ -1294,12 +1296,13 @@ pub async fn blinded_equivocation_consensus_late_equivocation() {
|
|||||||
|
|
||||||
let channel = tokio::sync::mpsc::unbounded_channel();
|
let channel = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
|
||||||
let publication_result: Result<(), Rejection> = publish_blinded_block(
|
let publication_result = publish_blinded_block(
|
||||||
block_b,
|
block_b,
|
||||||
tester.harness.chain,
|
tester.harness.chain,
|
||||||
&channel.0,
|
&channel.0,
|
||||||
test_logger,
|
test_logger,
|
||||||
validation_level.unwrap(),
|
validation_level.unwrap(),
|
||||||
|
StatusCode::ACCEPTED,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
@ -8,7 +8,7 @@ use eth2::{
|
|||||||
mixin::{RequestAccept, ResponseForkName, ResponseOptional},
|
mixin::{RequestAccept, ResponseForkName, ResponseOptional},
|
||||||
reqwest::RequestBuilder,
|
reqwest::RequestBuilder,
|
||||||
types::{BlockId as CoreBlockId, ForkChoiceNode, StateId as CoreStateId, *},
|
types::{BlockId as CoreBlockId, ForkChoiceNode, StateId as CoreStateId, *},
|
||||||
BeaconNodeHttpClient, Error, Timeouts,
|
BeaconNodeHttpClient, Error, StatusCode, Timeouts,
|
||||||
};
|
};
|
||||||
use execution_layer::test_utils::TestingBuilder;
|
use execution_layer::test_utils::TestingBuilder;
|
||||||
use execution_layer::test_utils::DEFAULT_BUILDER_THRESHOLD_WEI;
|
use execution_layer::test_utils::DEFAULT_BUILDER_THRESHOLD_WEI;
|
||||||
@ -1318,6 +1318,63 @@ impl ApiTester {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn test_post_beacon_blocks_duplicate(self) -> Self {
|
||||||
|
let block = self
|
||||||
|
.harness
|
||||||
|
.make_block(
|
||||||
|
self.harness.get_current_state(),
|
||||||
|
self.harness.get_current_slot(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.0;
|
||||||
|
|
||||||
|
assert!(self.client.post_beacon_blocks(&block).await.is_ok());
|
||||||
|
|
||||||
|
let blinded_block = block.clone_as_blinded();
|
||||||
|
|
||||||
|
// Test all the POST methods in sequence, they should all behave the same.
|
||||||
|
let responses = vec![
|
||||||
|
self.client.post_beacon_blocks(&block).await.unwrap_err(),
|
||||||
|
self.client
|
||||||
|
.post_beacon_blocks_v2(&block, None)
|
||||||
|
.await
|
||||||
|
.unwrap_err(),
|
||||||
|
self.client
|
||||||
|
.post_beacon_blocks_ssz(&block)
|
||||||
|
.await
|
||||||
|
.unwrap_err(),
|
||||||
|
self.client
|
||||||
|
.post_beacon_blocks_v2_ssz(&block, None)
|
||||||
|
.await
|
||||||
|
.unwrap_err(),
|
||||||
|
self.client
|
||||||
|
.post_beacon_blinded_blocks(&blinded_block)
|
||||||
|
.await
|
||||||
|
.unwrap_err(),
|
||||||
|
self.client
|
||||||
|
.post_beacon_blinded_blocks_v2(&blinded_block, None)
|
||||||
|
.await
|
||||||
|
.unwrap_err(),
|
||||||
|
self.client
|
||||||
|
.post_beacon_blinded_blocks_ssz(&blinded_block)
|
||||||
|
.await
|
||||||
|
.unwrap_err(),
|
||||||
|
self.client
|
||||||
|
.post_beacon_blinded_blocks_v2_ssz(&blinded_block, None)
|
||||||
|
.await
|
||||||
|
.unwrap_err(),
|
||||||
|
];
|
||||||
|
for (i, response) in responses.into_iter().enumerate() {
|
||||||
|
assert_eq!(
|
||||||
|
response.status().unwrap(),
|
||||||
|
StatusCode::ACCEPTED,
|
||||||
|
"response {i}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn test_beacon_blocks(self) -> Self {
|
pub async fn test_beacon_blocks(self) -> Self {
|
||||||
for block_id in self.interesting_block_ids() {
|
for block_id in self.interesting_block_ids() {
|
||||||
let expected = block_id
|
let expected = block_id
|
||||||
@ -4651,6 +4708,14 @@ async fn post_beacon_blocks_invalid() {
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn post_beacon_blocks_duplicate() {
|
||||||
|
ApiTester::new()
|
||||||
|
.await
|
||||||
|
.test_post_beacon_blocks_duplicate()
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn beacon_pools_post_attestations_valid() {
|
async fn beacon_pools_post_attestations_valid() {
|
||||||
ApiTester::new()
|
ApiTester::new()
|
||||||
|
@ -391,6 +391,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
|||||||
.help("Multiplier to apply to the length of HTTP server-sent-event (SSE) channels. \
|
.help("Multiplier to apply to the length of HTTP server-sent-event (SSE) channels. \
|
||||||
Increasing this value can prevent messages from being dropped.")
|
Increasing this value can prevent messages from being dropped.")
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("http-duplicate-block-status")
|
||||||
|
.long("http-duplicate-block-status")
|
||||||
|
.takes_value(true)
|
||||||
|
.default_value("202")
|
||||||
|
.value_name("STATUS_CODE")
|
||||||
|
.help("Status code to send when a block that is already known is POSTed to the \
|
||||||
|
HTTP API.")
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("http-enable-beacon-processor")
|
Arg::with_name("http-enable-beacon-processor")
|
||||||
.long("http-enable-beacon-processor")
|
.long("http-enable-beacon-processor")
|
||||||
|
@ -155,6 +155,9 @@ pub fn get_config<E: EthSpec>(
|
|||||||
client_config.http_api.enable_beacon_processor =
|
client_config.http_api.enable_beacon_processor =
|
||||||
parse_required(cli_args, "http-enable-beacon-processor")?;
|
parse_required(cli_args, "http-enable-beacon-processor")?;
|
||||||
|
|
||||||
|
client_config.http_api.duplicate_block_status_code =
|
||||||
|
parse_required(cli_args, "http-duplicate-block-status")?;
|
||||||
|
|
||||||
if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? {
|
if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? {
|
||||||
client_config.chain.shuffling_cache_size = cache_size;
|
client_config.chain.shuffling_cache_size = cache_size;
|
||||||
}
|
}
|
||||||
|
@ -1316,6 +1316,26 @@ pub struct BroadcastValidationQuery {
|
|||||||
pub broadcast_validation: BroadcastValidation,
|
pub broadcast_validation: BroadcastValidation,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub mod serde_status_code {
|
||||||
|
use crate::StatusCode;
|
||||||
|
use serde::{de::Error, Deserialize, Serialize};
|
||||||
|
|
||||||
|
pub fn serialize<S>(status_code: &StatusCode, ser: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: serde::Serializer,
|
||||||
|
{
|
||||||
|
status_code.as_u16().serialize(ser)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(de: D) -> Result<StatusCode, D::Error>
|
||||||
|
where
|
||||||
|
D: serde::de::Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let status_code = u16::deserialize(de)?;
|
||||||
|
StatusCode::try_from(status_code).map_err(D::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -2387,3 +2387,22 @@ fn http_sse_capacity_multiplier_override() {
|
|||||||
.run_with_zero_port()
|
.run_with_zero_port()
|
||||||
.with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 10));
|
.with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 10));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn http_duplicate_block_status_default() {
|
||||||
|
CommandLineTest::new()
|
||||||
|
.run_with_zero_port()
|
||||||
|
.with_config(|config| {
|
||||||
|
assert_eq!(config.http_api.duplicate_block_status_code.as_u16(), 202)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn http_duplicate_block_status_override() {
|
||||||
|
CommandLineTest::new()
|
||||||
|
.flag("http-duplicate-block-status", Some("301"))
|
||||||
|
.run_with_zero_port()
|
||||||
|
.with_config(|config| {
|
||||||
|
assert_eq!(config.http_api.duplicate_block_status_code.as_u16(), 301)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
@ -10,7 +10,8 @@ use crate::{
|
|||||||
validator_store::{Error as ValidatorStoreError, ValidatorStore},
|
validator_store::{Error as ValidatorStoreError, ValidatorStore},
|
||||||
};
|
};
|
||||||
use environment::RuntimeContext;
|
use environment::RuntimeContext;
|
||||||
use eth2::BeaconNodeHttpClient;
|
use eth2::{BeaconNodeHttpClient, StatusCode};
|
||||||
|
use slog::Logger;
|
||||||
use slog::{crit, debug, error, info, trace, warn};
|
use slog::{crit, debug, error, info, trace, warn};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
@ -593,12 +594,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
|||||||
beacon_node
|
beacon_node
|
||||||
.post_beacon_blocks(&signed_block)
|
.post_beacon_blocks(&signed_block)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.or_else(|e| handle_block_post_error(e, slot, log))?
|
||||||
BlockError::Irrecoverable(format!(
|
|
||||||
"Error from beacon node when publishing block: {:?}",
|
|
||||||
e
|
|
||||||
))
|
|
||||||
})?
|
|
||||||
}
|
}
|
||||||
BlockType::Blinded => {
|
BlockType::Blinded => {
|
||||||
let _post_timer = metrics::start_timer_vec(
|
let _post_timer = metrics::start_timer_vec(
|
||||||
@ -608,12 +604,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
|||||||
beacon_node
|
beacon_node
|
||||||
.post_beacon_blinded_blocks(&signed_block)
|
.post_beacon_blinded_blocks(&signed_block)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.or_else(|e| handle_block_post_error(e, slot, log))?
|
||||||
BlockError::Irrecoverable(format!(
|
|
||||||
"Error from beacon node when publishing block: {:?}",
|
|
||||||
e
|
|
||||||
))
|
|
||||||
})?
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok::<_, BlockError>(())
|
Ok::<_, BlockError>(())
|
||||||
@ -634,3 +625,29 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn handle_block_post_error(err: eth2::Error, slot: Slot, log: &Logger) -> Result<(), BlockError> {
|
||||||
|
// Handle non-200 success codes.
|
||||||
|
if let Some(status) = err.status() {
|
||||||
|
if status == StatusCode::ACCEPTED {
|
||||||
|
info!(
|
||||||
|
log,
|
||||||
|
"Block is already known to BN or might be invalid";
|
||||||
|
"slot" => slot,
|
||||||
|
"status_code" => status.as_u16(),
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
} else if status.is_success() {
|
||||||
|
debug!(
|
||||||
|
log,
|
||||||
|
"Block published with non-standard success code";
|
||||||
|
"slot" => slot,
|
||||||
|
"status_code" => status.as_u16(),
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(BlockError::Irrecoverable(format!(
|
||||||
|
"Error from beacon node when publishing block: {err:?}",
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user