Engine API v1.0.0.alpha.6 + interop tests (#3024)

## Issue Addressed

NA

## Proposed Changes

This PR extends #3018 to address my review comments there and add automated integration tests with Geth (and other implementations, in the future).

I've also de-duplicated the "unused port" logic by creating an  `common/unused_port` crate.

## Additional Info

I'm not sure if we want to merge this PR, or update #3018 and merge that. I don't mind, I'm primarily opening this PR to make sure CI works.


Co-authored-by: Mark Mackey <mark@sigmaprime.io>
This commit is contained in:
Paul Hauner 2022-02-17 21:47:06 +00:00
parent 2f8531dc60
commit 0a6a8ea3b0
40 changed files with 1125 additions and 363 deletions

View File

@ -44,7 +44,7 @@ jobs:
run: make test-release
release-tests-windows:
name: release-tests-windows
runs-on: windows-latest
runs-on: windows-2019
needs: cargo-fmt
steps:
- uses: actions/checkout@v1
@ -184,6 +184,16 @@ jobs:
run: |
cd scripts/tests
./doppelganger_protection.sh failure
execution-engine-integration-ubuntu:
name: execution-engine-integration-ubuntu
runs-on: ubuntu-latest
needs: cargo-fmt
steps:
- uses: actions/checkout@v1
- name: Get latest version of stable Rust
run: rustup update stable
- name: Run exec engine integration tests in release
run: make test-exec-engine
check-benchmarks:
name: check-benchmarks
runs-on: ubuntu-latest

29
Cargo.lock generated
View File

@ -361,6 +361,7 @@ dependencies = [
"store",
"task_executor",
"types",
"unused_port",
]
[[package]]
@ -626,9 +627,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.0.72"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee"
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
[[package]]
name = "cexpr"
@ -1499,6 +1500,7 @@ dependencies = [
"serde_json",
"tokio",
"types",
"unused_port",
"web3",
]
@ -1804,6 +1806,23 @@ dependencies = [
"uint 0.9.3",
]
[[package]]
name = "execution_engine_integration"
version = "0.1.0"
dependencies = [
"environment",
"execution_layer",
"exit-future",
"futures",
"sensitive_url",
"serde_json",
"task_executor",
"tempfile",
"tokio",
"types",
"unused_port",
]
[[package]]
name = "execution_layer"
version = "0.1.0"
@ -3325,6 +3344,7 @@ dependencies = [
"task_executor",
"tempfile",
"types",
"unused_port",
"validator_client",
"validator_dir",
]
@ -3380,6 +3400,7 @@ dependencies = [
"tokio-util",
"types",
"unsigned-varint 0.6.0",
"unused_port",
"void",
]
@ -6599,6 +6620,10 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "unused_port"
version = "0.1.0"
[[package]]
name = "url"
version = "2.2.2"

View File

@ -39,6 +39,7 @@ members = [
"common/task_executor",
"common/target_check",
"common/test_random_derive",
"common/unused_port",
"common/validator_dir",
"common/warp_utils",
"common/fallback",
@ -74,6 +75,7 @@ members = [
"testing/ef_tests",
"testing/eth1_test_rig",
"testing/execution_engine_integration",
"testing/node_test_rig",
"testing/simulator",
"testing/test-test_logger",

View File

@ -2,6 +2,7 @@
EF_TESTS = "testing/ef_tests"
STATE_TRANSITION_VECTORS = "testing/state_transition_vectors"
EXECUTION_ENGINE_INTEGRATION = "testing/execution_engine_integration"
GIT_TAG := $(shell git describe --tags --candidates 1)
BIN_DIR = "bin"
@ -123,12 +124,16 @@ run-state-transition-tests:
# Downloads and runs the EF test vectors.
test-ef: make-ef-tests run-ef-tests
# Runs tests checking interop between Lighthouse and execution clients.
test-exec-engine:
make -C $(EXECUTION_ENGINE_INTEGRATION) test
# Runs the full workspace tests in release, without downloading any additional
# test vectors.
test: test-release
# Runs the entire test suite, downloading test vectors if required.
test-full: cargo-fmt test-release test-debug test-ef
test-full: cargo-fmt test-release test-debug test-ef test-exec-engine
# Lints the code for bad style and potentially unsafe arithmetic using Clippy.
# Clippy lints are opt-in per-crate for now. By default, everything is allowed except for performance and correctness lints.

View File

@ -39,3 +39,4 @@ slasher = { path = "../slasher" }
monitoring_api = { path = "../common/monitoring_api" }
sensitive_url = { path = "../common/sensitive_url" }
http_api = { path = "http_api" }
unused_port = { path = "../common/unused_port" }

View File

@ -52,7 +52,7 @@ use crate::{metrics, BeaconChainError};
use eth2::types::{
EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead, SseLateHead, SyncDuty,
};
use execution_layer::ExecutionLayer;
use execution_layer::{ExecutionLayer, PayloadStatusV1Status};
use fork_choice::{AttestationFromBlock, ForkChoice};
use futures::channel::mpsc::Sender;
use itertools::process_results;
@ -3590,10 +3590,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
store,
new_finalized_checkpoint.root,
new_head_execution_block_hash,
&log,
)
.await
{
debug!(
crit!(
log,
"Failed to update execution head";
"error" => ?e
@ -3613,6 +3614,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
store: BeaconStore<T>,
finalized_beacon_block_root: Hash256,
head_execution_block_hash: Hash256,
log: &Logger,
) -> Result<(), Error> {
// Loading the finalized block from the store is not ideal. Perhaps it would be better to
// store it on fork-choice so we can do a lookup without hitting the database.
@ -3630,14 +3632,45 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(|ep| ep.block_hash)
.unwrap_or_else(Hash256::zero);
execution_layer
let forkchoice_updated_response = execution_layer
.notify_forkchoice_updated(
head_execution_block_hash,
finalized_execution_block_hash,
None,
)
.await
.map_err(Error::ExecutionForkChoiceUpdateFailed)
.map_err(Error::ExecutionForkChoiceUpdateFailed);
match forkchoice_updated_response {
Ok((status, latest_valid_hash)) => match status {
PayloadStatusV1Status::Valid | PayloadStatusV1Status::Syncing => Ok(()),
// The specification doesn't list `ACCEPTED` as a valid response to a fork choice
// update. This response *seems* innocent enough, so we won't return early with an
// error. However, we create a log to bring attention to the issue.
PayloadStatusV1Status::Accepted => {
warn!(
log,
"Fork choice update received ACCEPTED";
"msg" => "execution engine provided an unexpected response to a fork \
choice update. although this is not a serious issue, please raise \
an issue."
);
Ok(())
}
PayloadStatusV1Status::Invalid
| PayloadStatusV1Status::InvalidTerminalBlock
| PayloadStatusV1Status::InvalidBlockHash => {
// TODO(bellatrix): process the invalid payload.
//
// See: https://github.com/sigp/lighthouse/pull/2837
Err(BeaconChainError::ExecutionForkChoiceUpdateInvalid {
status,
latest_valid_hash,
})
}
},
Err(e) => Err(e),
}
}
/// Returns the status of the current head block, regarding the validity of the execution

View File

@ -54,6 +54,7 @@ use crate::{
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use eth2::types::EventKind;
use execution_layer::PayloadStatusV1Status;
use fork_choice::{ForkChoice, ForkChoiceStore, PayloadVerificationStatus};
use parking_lot::RwLockReadGuard;
use proto_array::Block as ProtoBlock;
@ -269,7 +270,10 @@ pub enum ExecutionPayloadError {
/// ## Peer scoring
///
/// The block is invalid and the peer is faulty
RejectedByExecutionEngine,
RejectedByExecutionEngine {
status: PayloadStatusV1Status,
latest_valid_hash: Option<Vec<Hash256>>,
},
/// The execution payload timestamp does not match the slot
///
/// ## Peer scoring

View File

@ -8,6 +8,7 @@ use crate::naive_aggregation_pool::Error as NaiveAggregationError;
use crate::observed_aggregates::Error as ObservedAttestationsError;
use crate::observed_attesters::Error as ObservedAttestersError;
use crate::observed_block_producers::Error as ObservedBlockProducersError;
use execution_layer::PayloadStatusV1Status;
use futures::channel::mpsc::TrySendError;
use operation_pool::OpPoolError;
use safe_arith::ArithError;
@ -137,6 +138,10 @@ pub enum BeaconChainError {
AltairForkDisabled,
ExecutionLayerMissing,
ExecutionForkChoiceUpdateFailed(execution_layer::Error),
ExecutionForkChoiceUpdateInvalid {
status: PayloadStatusV1Status,
latest_valid_hash: Option<Vec<Hash256>>,
},
BlockRewardSlotError,
BlockRewardAttestationError,
BlockRewardSyncError,

View File

@ -11,7 +11,7 @@ use crate::{
BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, BlockProductionError,
ExecutionPayloadError,
};
use execution_layer::ExecutePayloadResponseStatus;
use execution_layer::PayloadStatusV1Status;
use fork_choice::PayloadVerificationStatus;
use proto_array::{Block as ProtoBlock, ExecutionStatus};
use slog::debug;
@ -53,19 +53,29 @@ pub fn notify_new_payload<T: BeaconChainTypes>(
.execution_layer
.as_ref()
.ok_or(ExecutionPayloadError::NoExecutionConnection)?;
let notify_new_payload_response = execution_layer
let new_payload_response = execution_layer
.block_on(|execution_layer| execution_layer.notify_new_payload(execution_payload));
match notify_new_payload_response {
Ok((status, _latest_valid_hash)) => match status {
ExecutePayloadResponseStatus::Valid => Ok(PayloadVerificationStatus::Verified),
// TODO(merge): invalidate any invalid ancestors of this block in fork choice.
ExecutePayloadResponseStatus::Invalid => {
Err(ExecutionPayloadError::RejectedByExecutionEngine.into())
match new_payload_response {
Ok((status, latest_valid_hash)) => match status {
PayloadStatusV1Status::Valid => Ok(PayloadVerificationStatus::Verified),
PayloadStatusV1Status::Syncing | PayloadStatusV1Status::Accepted => {
Ok(PayloadVerificationStatus::NotVerified)
}
PayloadStatusV1Status::Invalid
| PayloadStatusV1Status::InvalidTerminalBlock
| PayloadStatusV1Status::InvalidBlockHash => {
// TODO(bellatrix): process the invalid payload.
//
// See: https://github.com/sigp/lighthouse/pull/2837
Err(ExecutionPayloadError::RejectedByExecutionEngine {
status,
latest_valid_hash,
}
.into())
}
ExecutePayloadResponseStatus::Syncing => Ok(PayloadVerificationStatus::NotVerified),
},
Err(_) => Err(ExecutionPayloadError::RejectedByExecutionEngine.into()),
Err(e) => Err(ExecutionPayloadError::RequestFailed(e).into()),
}
}

View File

@ -681,6 +681,7 @@ where
store,
head.finalized_checkpoint.root,
block_hash,
&log,
)
.await;

View File

@ -55,10 +55,10 @@ pub trait EngineApi {
block_hash: Hash256,
) -> Result<Option<ExecutionBlock>, Error>;
async fn notify_new_payload_v1<T: EthSpec>(
async fn new_payload_v1<T: EthSpec>(
&self,
execution_payload: ExecutionPayload<T>,
) -> Result<ExecutePayloadResponse, Error>;
) -> Result<PayloadStatusV1, Error>;
async fn get_payload_v1<T: EthSpec>(
&self,
@ -73,15 +73,18 @@ pub trait EngineApi {
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ExecutePayloadResponseStatus {
pub enum PayloadStatusV1Status {
Valid,
Invalid,
Syncing,
Accepted,
InvalidBlockHash,
InvalidTerminalBlock,
}
#[derive(Clone, Debug, PartialEq)]
pub struct ExecutePayloadResponse {
pub status: ExecutePayloadResponseStatus,
pub struct PayloadStatusV1 {
pub status: PayloadStatusV1Status,
pub latest_valid_hash: Option<Hash256>,
pub validation_error: Option<String>,
}
@ -110,13 +113,8 @@ pub struct PayloadAttributes {
pub suggested_fee_recipient: Address,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ForkchoiceUpdatedResponseStatus {
Success,
Syncing,
}
#[derive(Clone, Debug, PartialEq)]
pub struct ForkchoiceUpdatedResponse {
pub status: ForkchoiceUpdatedResponseStatus,
pub payload_status: PayloadStatusV1,
pub payload_id: Option<PayloadId>,
}

View File

@ -27,8 +27,8 @@ pub const ETH_GET_BLOCK_BY_HASH_TIMEOUT: Duration = Duration::from_secs(1);
pub const ETH_SYNCING: &str = "eth_syncing";
pub const ETH_SYNCING_TIMEOUT: Duration = Duration::from_millis(250);
pub const ENGINE_EXECUTE_PAYLOAD_V1: &str = "engine_executePayloadV1";
pub const ENGINE_EXECUTE_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2);
pub const ENGINE_NEW_PAYLOAD_V1: &str = "engine_newPayloadV1";
pub const ENGINE_NEW_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2);
pub const ENGINE_GET_PAYLOAD_V1: &str = "engine_getPayloadV1";
pub const ENGINE_GET_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2);
@ -133,18 +133,14 @@ impl EngineApi for HttpJsonRpc {
.await
}
async fn notify_new_payload_v1<T: EthSpec>(
async fn new_payload_v1<T: EthSpec>(
&self,
execution_payload: ExecutionPayload<T>,
) -> Result<ExecutePayloadResponse, Error> {
) -> Result<PayloadStatusV1, Error> {
let params = json!([JsonExecutionPayloadV1::from(execution_payload)]);
let response: JsonExecutePayloadV1Response = self
.rpc_request(
ENGINE_EXECUTE_PAYLOAD_V1,
params,
ENGINE_EXECUTE_PAYLOAD_TIMEOUT,
)
let response: JsonPayloadStatusV1 = self
.rpc_request(ENGINE_NEW_PAYLOAD_V1, params, ENGINE_NEW_PAYLOAD_TIMEOUT)
.await?;
Ok(response.into())
@ -486,12 +482,12 @@ mod test {
}
#[tokio::test]
async fn notify_new_payload_v1_request() {
async fn new_payload_v1_request() {
Tester::new()
.assert_request_equals(
|client| async move {
let _ = client
.notify_new_payload_v1::<MainnetEthSpec>(ExecutionPayload {
.new_payload_v1::<MainnetEthSpec>(ExecutionPayload {
parent_hash: Hash256::repeat_byte(0),
fee_recipient: Address::repeat_byte(1),
state_root: Hash256::repeat_byte(1),
@ -512,7 +508,7 @@ mod test {
json!({
"id": STATIC_ID,
"jsonrpc": JSONRPC_VERSION,
"method": ENGINE_EXECUTE_PAYLOAD_V1,
"method": ENGINE_NEW_PAYLOAD_V1,
"params": [{
"parentHash": HASH_00,
"feeRecipient": ADDRESS_01,
@ -627,7 +623,11 @@ mod test {
"id": STATIC_ID,
"jsonrpc": JSONRPC_VERSION,
"result": {
"status": "SUCCESS",
"payloadStatus": {
"status": "VALID",
"latestValidHash": HASH_00,
"validationError": ""
},
"payloadId": "0xa247243752eb10b4"
}
})],
@ -648,7 +648,11 @@ mod test {
.await
.unwrap();
assert_eq!(response, ForkchoiceUpdatedResponse {
status: ForkchoiceUpdatedResponseStatus::Success,
payload_status: PayloadStatusV1 {
status: PayloadStatusV1Status::Valid,
latest_valid_hash: Some(Hash256::zero()),
validation_error: Some(String::new()),
},
payload_id:
Some(str_to_payload_id("0xa247243752eb10b4")),
});
@ -683,12 +687,12 @@ mod test {
"logsBloom": LOGS_BLOOM_00,
"random": HASH_00,
"blockNumber":"0x1",
"gasLimit":"0x1c9c380",
"gasLimit":"0x1c95111",
"gasUsed":"0x0",
"timestamp":"0x5",
"extraData":"0x",
"baseFeePerGas":"0x7",
"blockHash":"0x3559e851470f6e7bbed1db474980683e8c315bfce99b2a6ef47c057c04de7858",
"blockHash":"0x6359b8381a370e2f54072a5784ddd78b6ed024991558c511d4452eb4f6ac898c",
"transactions":[]
}
})],
@ -706,12 +710,12 @@ mod test {
logs_bloom: vec![0; 256].into(),
random: Hash256::zero(),
block_number: 1,
gas_limit: u64::from_str_radix("1c9c380",16).unwrap(),
gas_limit: u64::from_str_radix("1c95111",16).unwrap(),
gas_used: 0,
timestamp: 5,
extra_data: vec![].into(),
base_fee_per_gas: Uint256::from(7),
block_hash: Hash256::from_str("0x3559e851470f6e7bbed1db474980683e8c315bfce99b2a6ef47c057c04de7858").unwrap(),
block_hash: Hash256::from_str("0x6359b8381a370e2f54072a5784ddd78b6ed024991558c511d4452eb4f6ac898c").unwrap(),
transactions: vec![].into(),
};
@ -720,10 +724,10 @@ mod test {
)
.await
.assert_request_equals(
// engine_executePayloadV1 REQUEST validation
// engine_newPayloadV1 REQUEST validation
|client| async move {
let _ = client
.notify_new_payload_v1::<MainnetEthSpec>(ExecutionPayload {
.new_payload_v1::<MainnetEthSpec>(ExecutionPayload {
parent_hash: Hash256::from_str("0x3b8fb240d288781d4aac94d3fd16809ee413bc99294a085798a589dae51ddd4a").unwrap(),
fee_recipient: Address::from_str("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b").unwrap(),
state_root: Hash256::from_str("0xca3149fa9e37db08d1cd49c9061db1002ef1cd58db2210f2115c8c989b2bdf45").unwrap(),
@ -744,7 +748,7 @@ mod test {
json!({
"id": STATIC_ID,
"jsonrpc": JSONRPC_VERSION,
"method": ENGINE_EXECUTE_PAYLOAD_V1,
"method": ENGINE_NEW_PAYLOAD_V1,
"params": [{
"parentHash":"0x3b8fb240d288781d4aac94d3fd16809ee413bc99294a085798a589dae51ddd4a",
"feeRecipient":"0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b",
@ -765,26 +769,27 @@ mod test {
)
.await
.with_preloaded_responses(
// engine_executePayloadV1 RESPONSE validation
// engine_newPayloadV1 RESPONSE validation
vec![json!({
"jsonrpc": JSONRPC_VERSION,
"id": STATIC_ID,
"result":{
"status":"VALID",
"latestValidHash":"0x3559e851470f6e7bbed1db474980683e8c315bfce99b2a6ef47c057c04de7858"
"latestValidHash":"0x3559e851470f6e7bbed1db474980683e8c315bfce99b2a6ef47c057c04de7858",
"validationError":"",
}
})],
|client| async move {
let response = client
.notify_new_payload_v1::<MainnetEthSpec>(ExecutionPayload::default())
.new_payload_v1::<MainnetEthSpec>(ExecutionPayload::default())
.await
.unwrap();
assert_eq!(response,
ExecutePayloadResponse {
status: ExecutePayloadResponseStatus::Valid,
PayloadStatusV1 {
status: PayloadStatusV1Status::Valid,
latest_valid_hash: Some(Hash256::from_str("0x3559e851470f6e7bbed1db474980683e8c315bfce99b2a6ef47c057c04de7858").unwrap()),
validation_error: None
validation_error: Some(String::new()),
}
);
},
@ -819,14 +824,15 @@ mod test {
.await
.with_preloaded_responses(
// engine_forkchoiceUpdatedV1 RESPONSE validation
//
// Note: this test was modified to provide `null` rather than `0x`. The geth vectors
// are invalid.
vec![json!({
"jsonrpc": JSONRPC_VERSION,
"id": STATIC_ID,
"result": {
"status":"SUCCESS",
"payloadStatus": {
"status": "VALID",
"latestValidHash": HASH_00,
"validationError": ""
},
"payloadId": JSON_NULL,
}
})],
@ -843,7 +849,11 @@ mod test {
.await
.unwrap();
assert_eq!(response, ForkchoiceUpdatedResponse {
status: ForkchoiceUpdatedResponseStatus::Success,
payload_status: PayloadStatusV1 {
status: PayloadStatusV1Status::Valid,
latest_valid_hash: Some(Hash256::zero()),
validation_error: Some(String::new()),
},
payload_id: None,
});
},

View File

@ -247,47 +247,60 @@ impl From<JsonForkChoiceStateV1> for ForkChoiceState {
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum JsonExecutePayloadV1ResponseStatus {
pub enum JsonPayloadStatusV1Status {
Valid,
Invalid,
Syncing,
Accepted,
InvalidBlockHash,
InvalidTerminalBlock,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct JsonExecutePayloadV1Response {
pub status: JsonExecutePayloadV1ResponseStatus,
pub struct JsonPayloadStatusV1 {
pub status: JsonPayloadStatusV1Status,
pub latest_valid_hash: Option<Hash256>,
pub validation_error: Option<String>,
}
impl From<ExecutePayloadResponseStatus> for JsonExecutePayloadV1ResponseStatus {
fn from(e: ExecutePayloadResponseStatus) -> Self {
impl From<PayloadStatusV1Status> for JsonPayloadStatusV1Status {
fn from(e: PayloadStatusV1Status) -> Self {
match e {
ExecutePayloadResponseStatus::Valid => JsonExecutePayloadV1ResponseStatus::Valid,
ExecutePayloadResponseStatus::Invalid => JsonExecutePayloadV1ResponseStatus::Invalid,
ExecutePayloadResponseStatus::Syncing => JsonExecutePayloadV1ResponseStatus::Syncing,
PayloadStatusV1Status::Valid => JsonPayloadStatusV1Status::Valid,
PayloadStatusV1Status::Invalid => JsonPayloadStatusV1Status::Invalid,
PayloadStatusV1Status::Syncing => JsonPayloadStatusV1Status::Syncing,
PayloadStatusV1Status::Accepted => JsonPayloadStatusV1Status::Accepted,
PayloadStatusV1Status::InvalidBlockHash => JsonPayloadStatusV1Status::InvalidBlockHash,
PayloadStatusV1Status::InvalidTerminalBlock => {
JsonPayloadStatusV1Status::InvalidTerminalBlock
}
}
}
}
impl From<JsonExecutePayloadV1ResponseStatus> for ExecutePayloadResponseStatus {
fn from(j: JsonExecutePayloadV1ResponseStatus) -> Self {
impl From<JsonPayloadStatusV1Status> for PayloadStatusV1Status {
fn from(j: JsonPayloadStatusV1Status) -> Self {
match j {
JsonExecutePayloadV1ResponseStatus::Valid => ExecutePayloadResponseStatus::Valid,
JsonExecutePayloadV1ResponseStatus::Invalid => ExecutePayloadResponseStatus::Invalid,
JsonExecutePayloadV1ResponseStatus::Syncing => ExecutePayloadResponseStatus::Syncing,
JsonPayloadStatusV1Status::Valid => PayloadStatusV1Status::Valid,
JsonPayloadStatusV1Status::Invalid => PayloadStatusV1Status::Invalid,
JsonPayloadStatusV1Status::Syncing => PayloadStatusV1Status::Syncing,
JsonPayloadStatusV1Status::Accepted => PayloadStatusV1Status::Accepted,
JsonPayloadStatusV1Status::InvalidBlockHash => PayloadStatusV1Status::InvalidBlockHash,
JsonPayloadStatusV1Status::InvalidTerminalBlock => {
PayloadStatusV1Status::InvalidTerminalBlock
}
}
}
}
impl From<ExecutePayloadResponse> for JsonExecutePayloadV1Response {
fn from(e: ExecutePayloadResponse) -> Self {
impl From<PayloadStatusV1> for JsonPayloadStatusV1 {
fn from(p: PayloadStatusV1) -> Self {
// Use this verbose deconstruction pattern to ensure no field is left unused.
let ExecutePayloadResponse {
let PayloadStatusV1 {
status,
latest_valid_hash,
validation_error,
} = e;
} = p;
Self {
status: status.into(),
@ -297,10 +310,10 @@ impl From<ExecutePayloadResponse> for JsonExecutePayloadV1Response {
}
}
impl From<JsonExecutePayloadV1Response> for ExecutePayloadResponse {
fn from(j: JsonExecutePayloadV1Response) -> Self {
impl From<JsonPayloadStatusV1> for PayloadStatusV1 {
fn from(j: JsonPayloadStatusV1) -> Self {
// Use this verbose deconstruction pattern to ensure no field is left unused.
let JsonExecutePayloadV1Response {
let JsonPayloadStatusV1 {
status,
latest_valid_hash,
validation_error,
@ -314,50 +327,23 @@ impl From<JsonExecutePayloadV1Response> for ExecutePayloadResponse {
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum JsonForkchoiceUpdatedV1ResponseStatus {
Success,
Syncing,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct JsonForkchoiceUpdatedV1Response {
pub status: JsonForkchoiceUpdatedV1ResponseStatus,
pub payload_status: JsonPayloadStatusV1,
pub payload_id: Option<TransparentJsonPayloadId>,
}
impl From<JsonForkchoiceUpdatedV1ResponseStatus> for ForkchoiceUpdatedResponseStatus {
fn from(j: JsonForkchoiceUpdatedV1ResponseStatus) -> Self {
match j {
JsonForkchoiceUpdatedV1ResponseStatus::Success => {
ForkchoiceUpdatedResponseStatus::Success
}
JsonForkchoiceUpdatedV1ResponseStatus::Syncing => {
ForkchoiceUpdatedResponseStatus::Syncing
}
}
}
}
impl From<ForkchoiceUpdatedResponseStatus> for JsonForkchoiceUpdatedV1ResponseStatus {
fn from(f: ForkchoiceUpdatedResponseStatus) -> Self {
match f {
ForkchoiceUpdatedResponseStatus::Success => {
JsonForkchoiceUpdatedV1ResponseStatus::Success
}
ForkchoiceUpdatedResponseStatus::Syncing => {
JsonForkchoiceUpdatedV1ResponseStatus::Syncing
}
}
}
}
impl From<JsonForkchoiceUpdatedV1Response> for ForkchoiceUpdatedResponse {
fn from(j: JsonForkchoiceUpdatedV1Response) -> Self {
// Use this verbose deconstruction pattern to ensure no field is left unused.
let JsonForkchoiceUpdatedV1Response { status, payload_id } = j;
let JsonForkchoiceUpdatedV1Response {
payload_status: status,
payload_id,
} = j;
Self {
status: status.into(),
payload_status: status.into(),
payload_id: payload_id.map(Into::into),
}
}
@ -365,10 +351,13 @@ impl From<JsonForkchoiceUpdatedV1Response> for ForkchoiceUpdatedResponse {
impl From<ForkchoiceUpdatedResponse> for JsonForkchoiceUpdatedV1Response {
fn from(f: ForkchoiceUpdatedResponse) -> Self {
// Use this verbose deconstruction pattern to ensure no field is left unused.
let ForkchoiceUpdatedResponse { status, payload_id } = f;
let ForkchoiceUpdatedResponse {
payload_status: status,
payload_id,
} = f;
Self {
status: status.into(),
payload_status: status.into(),
payload_id: payload_id.map(Into::into),
}
}

View File

@ -1,6 +1,8 @@
//! Provides generic behaviour for multiple execution engines, specifically fallback behaviour.
use crate::engine_api::{EngineApi, Error as EngineApiError, PayloadAttributes, PayloadId};
use crate::engine_api::{
EngineApi, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId,
};
use futures::future::join_all;
use lru::LruCache;
use slog::{crit, debug, info, warn, Logger};
@ -97,7 +99,7 @@ impl<T: EngineApi> Engine<T> {
forkchoice_state: ForkChoiceState,
payload_attributes: Option<PayloadAttributes>,
log: &Logger,
) -> Result<Option<PayloadId>, EngineApiError> {
) -> Result<ForkchoiceUpdatedResponse, EngineApiError> {
let response = self
.api
.forkchoice_updated_v1(forkchoice_state, payload_attributes)
@ -117,7 +119,7 @@ impl<T: EngineApi> Engine<T> {
}
}
Ok(response.payload_id)
Ok(response)
}
}

View File

@ -10,7 +10,7 @@ use lru::LruCache;
use sensitive_url::SensitiveUrl;
use slog::{crit, debug, error, info, Logger};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
@ -21,7 +21,7 @@ use tokio::{
};
use types::{ChainSpec, Epoch, ProposerPreparationData};
pub use engine_api::{http::HttpJsonRpc, ExecutePayloadResponseStatus};
pub use engine_api::{http::HttpJsonRpc, PayloadAttributes, PayloadStatusV1Status};
mod engine_api;
mod engines;
@ -49,6 +49,7 @@ pub enum Error {
NotSynced,
ShuttingDown,
FeeRecipientUnspecified,
ConsensusFailure,
}
impl From<ApiError> for Error {
@ -249,7 +250,7 @@ impl ExecutionLayer {
}
/// Performs a single execution of the watchdog routine.
async fn watchdog_task(&self) {
pub async fn watchdog_task(&self) {
// Disable logging since this runs frequently and may get annoying.
self.engines().upcheck_not_synced(Logging::Disabled).await;
}
@ -431,7 +432,8 @@ impl ExecutionLayer {
Some(payload_attributes),
self.log(),
)
.await?
.await
.map(|response| response.payload_id)?
.ok_or(ApiError::PayloadIdUnavailable)?
};
@ -449,6 +451,7 @@ impl ExecutionLayer {
/// failure) from all nodes and then return based on the first of these conditions which
/// returns true:
///
/// - Error::ConsensusFailure if some nodes return valid and some return invalid
/// - Valid, if any nodes return valid.
/// - Invalid, if any nodes return invalid.
/// - Syncing, if any nodes return syncing.
@ -456,10 +459,10 @@ impl ExecutionLayer {
pub async fn notify_new_payload<T: EthSpec>(
&self,
execution_payload: &ExecutionPayload<T>,
) -> Result<(ExecutePayloadResponseStatus, Option<Hash256>), Error> {
) -> Result<(PayloadStatusV1Status, Option<Vec<Hash256>>), Error> {
debug!(
self.log(),
"Issuing engine_executePayload";
"Issuing engine_newPayload";
"parent_hash" => ?execution_payload.parent_hash,
"block_hash" => ?execution_payload.block_hash,
"block_number" => execution_payload.block_number,
@ -467,46 +470,55 @@ impl ExecutionLayer {
let broadcast_results = self
.engines()
.broadcast(|engine| engine.api.notify_new_payload_v1(execution_payload.clone()))
.broadcast(|engine| engine.api.new_payload_v1(execution_payload.clone()))
.await;
let mut errors = vec![];
let mut valid = 0;
let mut invalid = 0;
let mut syncing = 0;
let mut invalid_latest_valid_hash = vec![];
let mut invalid_latest_valid_hash = HashSet::new();
for result in broadcast_results {
match result.map(|response| (response.latest_valid_hash, response.status)) {
Ok((Some(latest_hash), ExecutePayloadResponseStatus::Valid)) => {
if latest_hash == execution_payload.block_hash {
valid += 1;
} else {
invalid += 1;
errors.push(EngineError::Api {
id: "unknown".to_string(),
error: engine_api::Error::BadResponse(
format!(
"notify_new_payload: response.status = Valid but invalid latest_valid_hash. Expected({:?}) Found({:?})",
execution_payload.block_hash,
latest_hash,
)
),
});
invalid_latest_valid_hash.push(latest_hash);
match result {
Ok(response) => match (&response.latest_valid_hash, &response.status) {
(Some(latest_hash), &PayloadStatusV1Status::Valid) => {
// According to a strict interpretation of the spec, the EE should never
// respond with `VALID` *and* a `latest_valid_hash`.
//
// For the sake of being liberal with what we accept, we will accept a
// `latest_valid_hash` *only if* it matches the submitted payload.
// Otherwise, register an error.
if latest_hash == &execution_payload.block_hash {
valid += 1;
} else {
errors.push(EngineError::Api {
id: "unknown".to_string(),
error: engine_api::Error::BadResponse(
format!(
"new_payload: response.status = Valid but invalid latest_valid_hash. Expected({:?}) Found({:?})",
execution_payload.block_hash,
latest_hash,
)
),
});
}
}
}
Ok((Some(latest_hash), ExecutePayloadResponseStatus::Invalid)) => {
invalid += 1;
invalid_latest_valid_hash.push(latest_hash);
}
Ok((_, ExecutePayloadResponseStatus::Syncing)) => syncing += 1,
Ok((None, status)) => errors.push(EngineError::Api {
id: "unknown".to_string(),
error: engine_api::Error::BadResponse(format!(
"notify_new_payload: status {:?} returned with null latest_valid_hash",
status
)),
}),
(Some(latest_hash), &PayloadStatusV1Status::Invalid) => {
invalid += 1;
invalid_latest_valid_hash.insert(*latest_hash);
}
(None, &PayloadStatusV1Status::InvalidBlockHash)
| (None, &PayloadStatusV1Status::InvalidTerminalBlock) => invalid += 1,
(None, &PayloadStatusV1Status::Syncing)
| (None, &PayloadStatusV1Status::Accepted) => syncing += 1,
_ => errors.push(EngineError::Api {
id: "unknown".to_string(),
error: engine_api::Error::BadResponse(format!(
"new_payload: response does not conform to engine API spec: {:?}",
response,
)),
}),
},
Err(e) => errors.push(e),
}
}
@ -515,19 +527,24 @@ impl ExecutionLayer {
crit!(
self.log(),
"Consensus failure between execution nodes";
"method" => "notify_new_payload"
"method" => "new_payload"
);
// In this situation, better to have a failure of liveness than vote on a potentially invalid chain
return Err(Error::ConsensusFailure);
}
if valid > 0 {
Ok((
ExecutePayloadResponseStatus::Valid,
Some(execution_payload.block_hash),
PayloadStatusV1Status::Valid,
Some(vec![execution_payload.block_hash]),
))
} else if invalid > 0 {
Ok((ExecutePayloadResponseStatus::Invalid, None))
Ok((
PayloadStatusV1Status::Invalid,
Some(invalid_latest_valid_hash.into_iter().collect()),
))
} else if syncing > 0 {
Ok((ExecutePayloadResponseStatus::Syncing, None))
Ok((PayloadStatusV1Status::Syncing, None))
} else {
Err(Error::EngineErrors(errors))
}
@ -541,14 +558,17 @@ impl ExecutionLayer {
/// failure) from all nodes and then return based on the first of these conditions which
/// returns true:
///
/// - Ok, if any node returns successfully.
/// - Error::ConsensusFailure if some nodes return valid and some return invalid
/// - Valid, if any nodes return valid.
/// - Invalid, if any nodes return invalid.
/// - Syncing, if any nodes return syncing.
/// - An error, if all nodes return an error.
pub async fn notify_forkchoice_updated(
&self,
head_block_hash: Hash256,
finalized_block_hash: Hash256,
payload_attributes: Option<PayloadAttributes>,
) -> Result<(), Error> {
) -> Result<(PayloadStatusV1Status, Option<Vec<Hash256>>), Error> {
debug!(
self.log(),
"Issuing engine_forkchoiceUpdated";
@ -577,13 +597,76 @@ impl ExecutionLayer {
})
.await;
if broadcast_results.iter().any(Result::is_ok) {
Ok(())
let mut errors = vec![];
let mut valid = 0;
let mut invalid = 0;
let mut syncing = 0;
let mut invalid_latest_valid_hash = HashSet::new();
for result in broadcast_results {
match result {
Ok(response) => match (&response.payload_status.latest_valid_hash, &response.payload_status.status) {
// TODO(bellatrix) a strict interpretation of the v1.0.0.alpha.6 spec says that
// `latest_valid_hash` *cannot* be `None`. However, we accept it to maintain
// Geth compatibility for the short term. See:
//
// https://github.com/ethereum/go-ethereum/issues/24404
(None, &PayloadStatusV1Status::Valid) => valid += 1,
(Some(latest_hash), &PayloadStatusV1Status::Valid) => {
if latest_hash == &head_block_hash {
valid += 1;
} else {
errors.push(EngineError::Api {
id: "unknown".to_string(),
error: engine_api::Error::BadResponse(
format!(
"forkchoice_updated: payload_status = Valid but invalid latest_valid_hash. Expected({:?}) Found({:?})",
head_block_hash,
*latest_hash,
)
),
});
}
}
(Some(latest_hash), &PayloadStatusV1Status::Invalid) => {
invalid += 1;
invalid_latest_valid_hash.insert(*latest_hash);
}
(None, &PayloadStatusV1Status::InvalidTerminalBlock) => invalid += 1,
(None, &PayloadStatusV1Status::Syncing) => syncing += 1,
_ => {
errors.push(EngineError::Api {
id: "unknown".to_string(),
error: engine_api::Error::BadResponse(format!(
"forkchoice_updated: response does not conform to engine API spec: {:?}",
response
)),
})
}
}
Err(e) => errors.push(e),
}
}
if valid > 0 && invalid > 0 {
crit!(
self.log(),
"Consensus failure between execution nodes";
"method" => "forkchoice_updated"
);
// In this situation, better to have a failure of liveness than vote on a potentially invalid chain
return Err(Error::ConsensusFailure);
}
if valid > 0 {
Ok((PayloadStatusV1Status::Valid, Some(vec![head_block_hash])))
} else if invalid > 0 {
Ok((
PayloadStatusV1Status::Invalid,
Some(invalid_latest_valid_hash.into_iter().collect()),
))
} else if syncing > 0 {
Ok((PayloadStatusV1Status::Syncing, None))
} else {
let errors = broadcast_results
.into_iter()
.filter_map(Result::err)
.collect();
Err(Error::EngineErrors(errors))
}
}

View File

@ -1,6 +1,5 @@
use crate::engine_api::{
ExecutePayloadResponse, ExecutePayloadResponseStatus, ExecutionBlock, PayloadAttributes,
PayloadId,
ExecutionBlock, PayloadAttributes, PayloadId, PayloadStatusV1, PayloadStatusV1Status,
};
use crate::engines::ForkChoiceState;
use serde::{Deserialize, Serialize};
@ -235,20 +234,20 @@ impl<T: EthSpec> ExecutionBlockGenerator<T> {
self.payload_ids.remove(id)
}
pub fn notify_new_payload(&mut self, payload: ExecutionPayload<T>) -> ExecutePayloadResponse {
pub fn new_payload(&mut self, payload: ExecutionPayload<T>) -> PayloadStatusV1 {
let parent = if let Some(parent) = self.blocks.get(&payload.parent_hash) {
parent
} else {
return ExecutePayloadResponse {
status: ExecutePayloadResponseStatus::Syncing,
return PayloadStatusV1 {
status: PayloadStatusV1Status::Syncing,
latest_valid_hash: None,
validation_error: None,
};
};
if payload.block_number != parent.block_number() + 1 {
return ExecutePayloadResponse {
status: ExecutePayloadResponseStatus::Invalid,
return PayloadStatusV1 {
status: PayloadStatusV1Status::Invalid,
latest_valid_hash: Some(parent.block_hash()),
validation_error: Some("invalid block number".to_string()),
};
@ -257,8 +256,8 @@ impl<T: EthSpec> ExecutionBlockGenerator<T> {
let valid_hash = payload.block_hash;
self.pending_payloads.insert(payload.block_hash, payload);
ExecutePayloadResponse {
status: ExecutePayloadResponseStatus::Valid,
PayloadStatusV1 {
status: PayloadStatusV1Status::Valid,
latest_valid_hash: Some(valid_hash),
validation_error: None,
}

View File

@ -1,5 +1,5 @@
use super::Context;
use crate::engine_api::{http::*, ExecutePayloadResponse, ExecutePayloadResponseStatus};
use crate::engine_api::{http::*, PayloadStatusV1, PayloadStatusV1Status};
use crate::json_structures::*;
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;
@ -54,30 +54,30 @@ pub async fn handle_rpc<T: EthSpec>(
)
.unwrap())
}
ENGINE_EXECUTE_PAYLOAD_V1 => {
ENGINE_NEW_PAYLOAD_V1 => {
let request: JsonExecutionPayloadV1<T> = get_param(params, 0)?;
let response = if let Some(status) = *ctx.static_notify_new_payload_response.lock() {
let response = if let Some(status) = *ctx.static_new_payload_response.lock() {
match status {
ExecutePayloadResponseStatus::Valid => ExecutePayloadResponse {
PayloadStatusV1Status::Valid => PayloadStatusV1 {
status,
latest_valid_hash: Some(request.block_hash),
validation_error: None,
},
ExecutePayloadResponseStatus::Syncing => ExecutePayloadResponse {
PayloadStatusV1Status::Syncing => PayloadStatusV1 {
status,
latest_valid_hash: None,
validation_error: None,
},
_ => unimplemented!("invalid static executePayloadResponse"),
_ => unimplemented!("invalid static newPayloadResponse"),
}
} else {
ctx.execution_block_generator
.write()
.notify_new_payload(request.into())
.new_payload(request.into())
};
Ok(serde_json::to_value(JsonExecutePayloadV1Response::from(response)).unwrap())
Ok(serde_json::to_value(JsonPayloadStatusV1::from(response)).unwrap())
}
ENGINE_GET_PAYLOAD_V1 => {
let request: JsonPayloadIdRequest = get_param(params, 0)?;
@ -94,6 +94,8 @@ pub async fn handle_rpc<T: EthSpec>(
ENGINE_FORKCHOICE_UPDATED_V1 => {
let forkchoice_state: JsonForkChoiceStateV1 = get_param(params, 0)?;
let payload_attributes: Option<JsonPayloadAttributesV1> = get_param(params, 1)?;
let head_block_hash = forkchoice_state.head_block_hash;
let id = ctx
.execution_block_generator
.write()
@ -103,7 +105,11 @@ pub async fn handle_rpc<T: EthSpec>(
)?;
Ok(serde_json::to_value(JsonForkchoiceUpdatedV1Response {
status: JsonForkchoiceUpdatedV1ResponseStatus::Success,
payload_status: JsonPayloadStatusV1 {
status: JsonPayloadStatusV1Status::Valid,
latest_valid_hash: Some(head_block_hash),
validation_error: None,
},
payload_id: id.map(Into::into),
})
.unwrap())

View File

@ -147,8 +147,8 @@ impl<T: EthSpec> MockExecutionLayer<T> {
let (payload_response, latest_valid_hash) =
self.el.notify_new_payload(&payload).await.unwrap();
assert_eq!(payload_response, ExecutePayloadResponseStatus::Valid);
assert_eq!(latest_valid_hash, Some(payload.block_hash));
assert_eq!(payload_response, PayloadStatusV1Status::Valid);
assert_eq!(latest_valid_hash, Some(vec![payload.block_hash]));
self.el
.notify_forkchoice_updated(block_hash, Hash256::zero(), None)

View File

@ -1,7 +1,7 @@
//! Provides a mock execution engine HTTP JSON-RPC API for use in testing.
use crate::engine_api::http::JSONRPC_VERSION;
use crate::engine_api::ExecutePayloadResponseStatus;
use crate::engine_api::PayloadStatusV1Status;
use bytes::Bytes;
use environment::null_logger;
use execution_block_generator::{Block, PoWBlock};
@ -62,7 +62,7 @@ impl<T: EthSpec> MockServer<T> {
last_echo_request: last_echo_request.clone(),
execution_block_generator: RwLock::new(execution_block_generator),
preloaded_responses,
static_notify_new_payload_response: <_>::default(),
static_new_payload_response: <_>::default(),
_phantom: PhantomData,
});
@ -117,8 +117,7 @@ impl<T: EthSpec> MockServer<T> {
}
pub fn all_payloads_valid(&self) {
*self.ctx.static_notify_new_payload_response.lock() =
Some(ExecutePayloadResponseStatus::Valid)
*self.ctx.static_new_payload_response.lock() = Some(PayloadStatusV1Status::Valid)
}
pub fn insert_pow_block(
@ -188,7 +187,7 @@ pub struct Context<T: EthSpec> {
pub last_echo_request: Arc<RwLock<Option<Bytes>>>,
pub execution_block_generator: RwLock<ExecutionBlockGenerator<T>>,
pub preloaded_responses: Arc<Mutex<Vec<serde_json::Value>>>,
pub static_notify_new_payload_response: Arc<Mutex<Option<ExecutePayloadResponseStatus>>>,
pub static_new_payload_response: Arc<Mutex<Option<PayloadStatusV1Status>>>,
pub _phantom: PhantomData<T>,
}

View File

@ -39,6 +39,7 @@ regex = "1.3.9"
strum = { version = "0.21.0", features = ["derive"] }
superstruct = "0.4.0"
prometheus-client = "0.15.0"
unused_port = { path = "../../common/unused_port" }
[dependencies.libp2p]
git = "https://github.com/sigp/rust-libp2p"

View File

@ -1049,17 +1049,11 @@ mod tests {
use crate::rpc::methods::{MetaData, MetaDataV2};
use enr::EnrBuilder;
use slog::{o, Drain};
use std::net::UdpSocket;
use types::{BitVector, MinimalEthSpec, SubnetId};
use unused_port::unused_udp_port;
type E = MinimalEthSpec;
pub fn unused_port() -> u16 {
let socket = UdpSocket::bind("127.0.0.1:0").expect("should create udp socket");
let local_addr = socket.local_addr().expect("should read udp socket");
local_addr.port()
}
pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
@ -1075,7 +1069,7 @@ mod tests {
async fn build_discovery() -> Discovery<E> {
let keypair = libp2p::identity::Keypair::generate_secp256k1();
let config = NetworkConfig {
discovery_port: unused_port(),
discovery_port: unused_udp_port().unwrap(),
..Default::default()
};
let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap();

View File

@ -6,12 +6,12 @@ use lighthouse_network::Multiaddr;
use lighthouse_network::Service as LibP2PService;
use lighthouse_network::{Libp2pEvent, NetworkConfig};
use slog::{debug, error, o, Drain};
use std::net::{TcpListener, UdpSocket};
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use tokio::runtime::Runtime;
use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, Hash256, MinimalEthSpec};
use unused_port::unused_tcp_port;
#[allow(clippy::type_complexity)]
#[allow(unused)]
@ -61,38 +61,6 @@ pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
}
}
// A bit of hack to find an unused port.
///
/// Does not guarantee that the given port is unused after the function exits, just that it was
/// unused before the function started (i.e., it does not reserve a port).
pub fn unused_port(transport: &str) -> Result<u16, String> {
let local_addr = match transport {
"tcp" => {
let listener = TcpListener::bind("127.0.0.1:0").map_err(|e| {
format!("Failed to create TCP listener to find unused port: {:?}", e)
})?;
listener.local_addr().map_err(|e| {
format!(
"Failed to read TCP listener local_addr to find unused port: {:?}",
e
)
})?
}
"udp" => {
let socket = UdpSocket::bind("127.0.0.1:0")
.map_err(|e| format!("Failed to create UDP socket to find unused port: {:?}", e))?;
socket.local_addr().map_err(|e| {
format!(
"Failed to read UDP socket local_addr to find unused port: {:?}",
e
)
})?
}
_ => return Err("Invalid transport to find unused port".into()),
};
Ok(local_addr.port())
}
pub fn build_config(port: u16, mut boot_nodes: Vec<Enr>) -> NetworkConfig {
let mut config = NetworkConfig::default();
let path = TempBuilder::new()
@ -121,7 +89,7 @@ pub async fn build_libp2p_instance(
boot_nodes: Vec<Enr>,
log: slog::Logger,
) -> Libp2pInstance {
let port = unused_port("tcp").unwrap();
let port = unused_tcp_port().unwrap();
let config = build_config(port, boot_nodes);
// launch libp2p service

View File

@ -11,10 +11,10 @@ use std::cmp;
use std::cmp::max;
use std::fs;
use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs};
use std::net::{TcpListener, UdpSocket};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use types::{Checkpoint, Epoch, EthSpec, Hash256, PublicKeyBytes, GRAFFITI_BYTES_LEN};
use unused_port::{unused_tcp_port, unused_udp_port};
/// Gets the fully-initialized global client.
///
@ -293,9 +293,9 @@ pub fn get_config<E: EthSpec>(
client_config.network.enr_address = None
}
client_config.network.libp2p_port =
unused_port("tcp").map_err(|e| format!("Failed to get port for libp2p: {}", e))?;
unused_tcp_port().map_err(|e| format!("Failed to get port for libp2p: {}", e))?;
client_config.network.discovery_port =
unused_port("udp").map_err(|e| format!("Failed to get port for discovery: {}", e))?;
unused_udp_port().map_err(|e| format!("Failed to get port for discovery: {}", e))?;
client_config.http_api.listen_port = 0;
client_config.http_metrics.listen_port = 0;
}
@ -785,44 +785,3 @@ pub fn get_data_dir(cli_args: &ArgMatches) -> PathBuf {
})
.unwrap_or_else(|| PathBuf::from("."))
}
/// A bit of hack to find an unused port.
///
/// Does not guarantee that the given port is unused after the function exits, just that it was
/// unused before the function started (i.e., it does not reserve a port).
///
/// Used for passing unused ports to libp2 so that lighthouse won't have to update
/// its own ENR.
///
/// NOTE: It is possible that libp2p/discv5 is unable to bind to the
/// ports returned by this function as the OS has a buffer period where
/// it doesn't allow binding to the same port even after the socket is closed.
/// We might have to use SO_REUSEADDR socket option from `std::net2` crate in
/// that case.
pub fn unused_port(transport: &str) -> Result<u16, String> {
let local_addr = match transport {
"tcp" => {
let listener = TcpListener::bind("127.0.0.1:0").map_err(|e| {
format!("Failed to create TCP listener to find unused port: {:?}", e)
})?;
listener.local_addr().map_err(|e| {
format!(
"Failed to read TCP listener local_addr to find unused port: {:?}",
e
)
})?
}
"udp" => {
let socket = UdpSocket::bind("127.0.0.1:0")
.map_err(|e| format!("Failed to create UDP socket to find unused port: {:?}", e))?;
socket.local_addr().map_err(|e| {
format!(
"Failed to read UDP socket local_addr to find unused port: {:?}",
e
)
})?
}
_ => return Err("Invalid transport to find unused port".into()),
};
Ok(local_addr.port())
}

View File

@ -0,0 +1,8 @@
[package]
name = "unused_port"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

View File

@ -0,0 +1,55 @@
use std::net::{TcpListener, UdpSocket};
#[derive(Copy, Clone)]
pub enum Transport {
Tcp,
Udp,
}
/// A convenience function for `unused_port(Transport::Tcp)`.
pub fn unused_tcp_port() -> Result<u16, String> {
unused_port(Transport::Tcp)
}
/// A convenience function for `unused_port(Transport::Tcp)`.
pub fn unused_udp_port() -> Result<u16, String> {
unused_port(Transport::Udp)
}
/// A bit of hack to find an unused port.
///
/// Does not guarantee that the given port is unused after the function exits, just that it was
/// unused before the function started (i.e., it does not reserve a port).
///
/// ## Notes
///
/// It is possible that users are unable to bind to the ports returned by this function as the OS
/// has a buffer period where it doesn't allow binding to the same port even after the socket is
/// closed. We might have to use SO_REUSEADDR socket option from `std::net2` crate in that case.
pub fn unused_port(transport: Transport) -> Result<u16, String> {
let local_addr = match transport {
Transport::Tcp => {
let listener = TcpListener::bind("127.0.0.1:0").map_err(|e| {
format!("Failed to create TCP listener to find unused port: {:?}", e)
})?;
listener.local_addr().map_err(|e| {
format!(
"Failed to read TCP listener local_addr to find unused port: {:?}",
e
)
})?
}
Transport::Udp => {
let socket = UdpSocket::bind("127.0.0.1:0")
.map_err(|e| format!("Failed to create UDP socket to find unused port: {:?}", e))?;
socket.local_addr().map_err(|e| {
format!(
"Failed to read UDP socket local_addr to find unused port: {:?}",
e
)
})?
}
};
Ok(local_addr.port())
}

View File

@ -44,6 +44,7 @@ serde_json = "1.0.59"
task_executor = { path = "../common/task_executor" }
malloc_utils = { path = "../common/malloc_utils" }
directory = { path = "../common/directory" }
unused_port = { path = "../common/unused_port" }
[dev-dependencies]
tempfile = "3.1.0"

View File

@ -5,13 +5,13 @@ use lighthouse_network::PeerId;
use std::fs::File;
use std::io::Write;
use std::net::{IpAddr, Ipv4Addr};
use std::net::{TcpListener, UdpSocket};
use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
use std::string::ToString;
use tempfile::TempDir;
use types::{Address, Checkpoint, Epoch, Hash256};
use unused_port::{unused_tcp_port, unused_udp_port};
const DEFAULT_ETH1_ENDPOINT: &str = "http://localhost:8545/";
@ -279,7 +279,7 @@ fn network_listen_address_flag() {
}
#[test]
fn network_port_flag() {
let port = unused_port("tcp").expect("Unable to find unused port.");
let port = unused_tcp_port().expect("Unable to find unused port.");
CommandLineTest::new()
.flag("port", Some(port.to_string().as_str()))
.run()
@ -290,8 +290,8 @@ fn network_port_flag() {
}
#[test]
fn network_port_and_discovery_port_flags() {
let port1 = unused_port("tcp").expect("Unable to find unused port.");
let port2 = unused_port("udp").expect("Unable to find unused port.");
let port1 = unused_tcp_port().expect("Unable to find unused port.");
let port2 = unused_udp_port().expect("Unable to find unused port.");
CommandLineTest::new()
.flag("port", Some(port1.to_string().as_str()))
.flag("discovery-port", Some(port2.to_string().as_str()))
@ -414,7 +414,7 @@ fn zero_ports_flag() {
// Tests for ENR flags.
#[test]
fn enr_udp_port_flags() {
let port = unused_port("udp").expect("Unable to find unused port.");
let port = unused_udp_port().expect("Unable to find unused port.");
CommandLineTest::new()
.flag("enr-udp-port", Some(port.to_string().as_str()))
.run_with_zero_port()
@ -422,7 +422,7 @@ fn enr_udp_port_flags() {
}
#[test]
fn enr_tcp_port_flags() {
let port = unused_port("tcp").expect("Unable to find unused port.");
let port = unused_tcp_port().expect("Unable to find unused port.");
CommandLineTest::new()
.flag("enr-tcp-port", Some(port.to_string().as_str()))
.run_with_zero_port()
@ -431,8 +431,8 @@ fn enr_tcp_port_flags() {
#[test]
fn enr_match_flag() {
let addr = "127.0.0.2".parse::<IpAddr>().unwrap();
let port1 = unused_port("udp").expect("Unable to find unused port.");
let port2 = unused_port("udp").expect("Unable to find unused port.");
let port1 = unused_udp_port().expect("Unable to find unused port.");
let port2 = unused_udp_port().expect("Unable to find unused port.");
CommandLineTest::new()
.flag("enr-match", None)
.flag("listen-address", Some("127.0.0.2"))
@ -449,7 +449,7 @@ fn enr_match_flag() {
#[test]
fn enr_address_flag() {
let addr = "192.167.1.1".parse::<IpAddr>().unwrap();
let port = unused_port("udp").expect("Unable to find unused port.");
let port = unused_udp_port().expect("Unable to find unused port.");
CommandLineTest::new()
.flag("enr-address", Some("192.167.1.1"))
.flag("enr-udp-port", Some(port.to_string().as_str()))
@ -463,7 +463,7 @@ fn enr_address_flag() {
fn enr_address_dns_flag() {
let addr = "127.0.0.1".parse::<IpAddr>().unwrap();
let ipv6addr = "::1".parse::<IpAddr>().unwrap();
let port = unused_port("udp").expect("Unable to find unused port.");
let port = unused_udp_port().expect("Unable to find unused port.");
CommandLineTest::new()
.flag("enr-address", Some("localhost"))
.flag("enr-udp-port", Some(port.to_string().as_str()))
@ -502,8 +502,8 @@ fn http_address_flag() {
}
#[test]
fn http_port_flag() {
let port1 = unused_port("tcp").expect("Unable to find unused port.");
let port2 = unused_port("tcp").expect("Unable to find unused port.");
let port1 = unused_tcp_port().expect("Unable to find unused port.");
let port2 = unused_tcp_port().expect("Unable to find unused port.");
CommandLineTest::new()
.flag("http-port", Some(port1.to_string().as_str()))
.flag("port", Some(port2.to_string().as_str()))
@ -573,8 +573,8 @@ fn metrics_address_flag() {
}
#[test]
fn metrics_port_flag() {
let port1 = unused_port("tcp").expect("Unable to find unused port.");
let port2 = unused_port("tcp").expect("Unable to find unused port.");
let port1 = unused_tcp_port().expect("Unable to find unused port.");
let port2 = unused_tcp_port().expect("Unable to find unused port.");
CommandLineTest::new()
.flag("metrics", None)
.flag("metrics-port", Some(port1.to_string().as_str()))
@ -856,35 +856,3 @@ fn ensure_panic_on_failed_launch() {
assert_eq!(slasher_config.chunk_size, 10);
});
}
/// A bit of hack to find an unused port.
///
/// Does not guarantee that the given port is unused after the function exits, just that it was
/// unused before the function started (i.e., it does not reserve a port).
pub fn unused_port(transport: &str) -> Result<u16, String> {
let local_addr = match transport {
"tcp" => {
let listener = TcpListener::bind("127.0.0.1:0").map_err(|e| {
format!("Failed to create TCP listener to find unused port: {:?}", e)
})?;
listener.local_addr().map_err(|e| {
format!(
"Failed to read TCP listener local_addr to find unused port: {:?}",
e
)
})?
}
"udp" => {
let socket = UdpSocket::bind("127.0.0.1:0")
.map_err(|e| format!("Failed to create UDP socket to find unused port: {:?}", e))?;
socket.local_addr().map_err(|e| {
format!(
"Failed to read UDP socket local_addr to find unused port: {:?}",
e
)
})?
}
_ => return Err("Invalid transport to find unused port".into()),
};
Ok(local_addr.port())
}

View File

@ -7,11 +7,12 @@ use lighthouse_network::discovery::ENR_FILENAME;
use lighthouse_network::Enr;
use std::fs::File;
use std::io::Write;
use std::net::{Ipv4Addr, UdpSocket};
use std::net::Ipv4Addr;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::str::FromStr;
use tempfile::TempDir;
use unused_port::unused_udp_port;
const IP_ADDRESS: &str = "192.168.2.108";
@ -51,15 +52,6 @@ impl CommandLineTestExec for CommandLineTest {
}
}
fn unused_port() -> u16 {
let socket =
UdpSocket::bind("127.0.0.1:0").expect("should create udp socket to find unused port");
let local_addr = socket
.local_addr()
.expect("should read udp socket to find unused port");
local_addr.port()
}
#[test]
fn enr_address_arg() {
let mut test = CommandLineTest::new();
@ -70,7 +62,7 @@ fn enr_address_arg() {
#[test]
fn port_flag() {
let port = unused_port();
let port = unused_udp_port().unwrap();
CommandLineTest::new()
.flag("port", Some(port.to_string().as_str()))
.run_with_ip()
@ -130,7 +122,7 @@ fn boot_nodes_flag() {
#[test]
fn enr_port_flag() {
let port = unused_port();
let port = unused_udp_port().unwrap();
CommandLineTest::new()
.flag("enr-port", Some(port.to_string().as_str()))
.run_with_ip()

View File

@ -10,3 +10,4 @@ web3 = { version = "0.17.0", default-features = false, features = ["http-tls", "
types = { path = "../../consensus/types"}
serde_json = "1.0.58"
deposit_contract = { path = "../../common/deposit_contract"}
unused_port = { path = "../../common/unused_port" }

View File

@ -1,9 +1,9 @@
use serde_json::json;
use std::io::prelude::*;
use std::io::BufReader;
use std::net::TcpListener;
use std::process::{Child, Command, Stdio};
use std::time::{Duration, Instant};
use unused_port::unused_tcp_port;
use web3::{transports::Http, Transport, Web3};
/// How long we will wait for ganache to indicate that it is ready.
@ -72,7 +72,7 @@ impl GanacheInstance {
/// Start a new `ganache-cli` process, waiting until it indicates that it is ready to accept
/// RPC connections.
pub fn new(network_id: u64, chain_id: u64) -> Result<Self, String> {
let port = unused_port()?;
let port = unused_tcp_port()?;
let binary = match cfg!(windows) {
true => "ganache-cli.cmd",
false => "ganache-cli",
@ -108,7 +108,7 @@ impl GanacheInstance {
}
pub fn fork(&self) -> Result<Self, String> {
let port = unused_port()?;
let port = unused_tcp_port()?;
let binary = match cfg!(windows) {
true => "ganache-cli.cmd",
false => "ganache-cli",
@ -188,24 +188,6 @@ fn endpoint(port: u16) -> String {
format!("http://localhost:{}", port)
}
/// A bit of hack to find an unused TCP port.
///
/// Does not guarantee that the given port is unused after the function exists, just that it was
/// unused before the function started (i.e., it does not reserve a port).
pub fn unused_port() -> Result<u16, String> {
let listener = TcpListener::bind("127.0.0.1:0")
.map_err(|e| format!("Failed to create TCP listener to find unused port: {:?}", e))?;
let local_addr = listener.local_addr().map_err(|e| {
format!(
"Failed to read TCP listener local_addr to find unused port: {:?}",
e
)
})?;
Ok(local_addr.port())
}
impl Drop for GanacheInstance {
fn drop(&mut self) {
if cfg!(windows) {

View File

@ -0,0 +1 @@
execution_clients/

View File

@ -0,0 +1,19 @@
[package]
name = "execution_engine_integration"
version = "0.1.0"
edition = "2021"
build = "build.rs"
[dependencies]
tempfile = "3.1.0"
serde_json = "1.0.58"
task_executor = { path = "../../common/task_executor" }
tokio = { version = "1.14.0", features = ["rt-multi-thread", "macros"] }
futures = "0.3.7"
exit-future = "0.2.0"
environment = { path = "../../lighthouse/environment" }
execution_layer = { path = "../../beacon_node/execution_layer" }
sensitive_url = { path = "../../common/sensitive_url" }
types = { path = "../../consensus/types" }
unused_port = { path = "../../common/unused_port" }

View File

@ -0,0 +1,5 @@
test:
cargo test --release --locked
clean:
rm -rf execution_clients

View File

@ -0,0 +1,62 @@
use std::env;
use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
const GETH_BRANCH: &str = "merge-kiln";
const GETH_REPO_URL: &str = "https://github.com/MariusVanDerWijden/go-ethereum";
fn main() {
let manifest_dir: PathBuf = env::var("CARGO_MANIFEST_DIR").unwrap().into();
let execution_clients_dir = manifest_dir.join("execution_clients");
if !execution_clients_dir.exists() {
fs::create_dir(&execution_clients_dir).unwrap();
}
build_geth(&execution_clients_dir);
}
fn build_geth(execution_clients_dir: &Path) {
let repo_dir = execution_clients_dir.join("go-ethereum");
if !repo_dir.exists() {
// Clone the repo
assert!(Command::new("git")
.arg("clone")
.arg(GETH_REPO_URL)
.current_dir(&execution_clients_dir)
.output()
.expect("failed to clone geth repo")
.status
.success());
}
// Checkout the correct branch
assert!(Command::new("git")
.arg("checkout")
.arg(GETH_BRANCH)
.current_dir(&repo_dir)
.output()
.expect("failed to checkout geth branch")
.status
.success());
// Update the branch
assert!(Command::new("git")
.arg("pull")
.current_dir(&repo_dir)
.output()
.expect("failed to update geth branch")
.status
.success());
// Build geth
assert!(Command::new("make")
.arg("geth")
.current_dir(&repo_dir)
.output()
.expect("failed to make geth")
.status
.success());
}

View File

@ -0,0 +1,131 @@
use crate::{genesis_json::geth_genesis_json, SUPPRESS_LOGS};
use sensitive_url::SensitiveUrl;
use std::path::PathBuf;
use std::process::{Child, Command, Output, Stdio};
use std::{env, fs::File};
use tempfile::TempDir;
use unused_port::unused_tcp_port;
/// Defined for each EE type (e.g., Geth, Nethermind, etc).
pub trait GenericExecutionEngine: Clone {
fn init_datadir() -> TempDir;
fn start_client(datadir: &TempDir, http_port: u16) -> Child;
}
/// Holds handle to a running EE process, plus some other metadata.
pub struct ExecutionEngine<E> {
#[allow(dead_code)]
engine: E,
#[allow(dead_code)]
datadir: TempDir,
http_port: u16,
child: Child,
}
impl<E> Drop for ExecutionEngine<E> {
fn drop(&mut self) {
// Ensure the EE process is killed on drop.
if let Err(e) = self.child.kill() {
eprintln!("failed to kill child: {:?}", e)
}
}
}
impl<E: GenericExecutionEngine> ExecutionEngine<E> {
pub fn new(engine: E) -> Self {
let datadir = E::init_datadir();
let http_port = unused_tcp_port().unwrap();
let child = E::start_client(&datadir, http_port);
Self {
engine,
datadir,
http_port,
child,
}
}
pub fn http_url(&self) -> SensitiveUrl {
SensitiveUrl::parse(&format!("http://127.0.0.1:{}", self.http_port)).unwrap()
}
}
/*
* Geth-specific Implementation
*/
#[derive(Clone)]
pub struct Geth;
impl Geth {
fn binary_path() -> PathBuf {
let manifest_dir: PathBuf = env::var("CARGO_MANIFEST_DIR").unwrap().into();
manifest_dir
.join("execution_clients")
.join("go-ethereum")
.join("build")
.join("bin")
.join("geth")
}
}
impl GenericExecutionEngine for Geth {
fn init_datadir() -> TempDir {
let datadir = TempDir::new().unwrap();
let genesis_json_path = datadir.path().join("genesis.json");
let mut file = File::create(&genesis_json_path).unwrap();
let json = geth_genesis_json();
serde_json::to_writer(&mut file, &json).unwrap();
let output = Command::new(Self::binary_path())
.arg("--datadir")
.arg(datadir.path().to_str().unwrap())
.arg("init")
.arg(genesis_json_path.to_str().unwrap())
.output()
.expect("failed to init geth");
check_command_output(output, "geth init failed");
datadir
}
fn start_client(datadir: &TempDir, http_port: u16) -> Child {
let network_port = unused_tcp_port().unwrap();
Command::new(Self::binary_path())
.arg("--datadir")
.arg(datadir.path().to_str().unwrap())
.arg("--http")
.arg("--http.api")
.arg("engine,eth")
.arg("--http.port")
.arg(http_port.to_string())
.arg("--port")
.arg(network_port.to_string())
.stdout(build_stdio())
.stderr(build_stdio())
.spawn()
.expect("failed to start beacon node")
}
}
fn check_command_output(output: Output, failure_msg: &'static str) {
if !output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
dbg!(stdout);
dbg!(stderr);
panic!("{}", failure_msg);
}
}
/// Builds the stdout/stderr handler for commands which might output to the terminal.
fn build_stdio() -> Stdio {
if SUPPRESS_LOGS {
Stdio::null()
} else {
Stdio::inherit()
}
}

View File

@ -0,0 +1,42 @@
use serde_json::{json, Value};
/// Sourced from:
///
/// https://notes.ethereum.org/rmVErCfCRPKGqGkUe89-Kg
pub fn geth_genesis_json() -> Value {
json!({
"config": {
"chainId":1,
"homesteadBlock":0,
"eip150Block":0,
"eip155Block":0,
"eip158Block":0,
"byzantiumBlock":0,
"constantinopleBlock":0,
"petersburgBlock":0,
"istanbulBlock":0,
"muirGlacierBlock":0,
"berlinBlock":0,
"londonBlock":0,
"clique": {
"period": 5,
"epoch": 30000
},
"terminalTotalDifficulty":0
},
"nonce":"0x42",
"timestamp":"0x0",
"extraData":"0x0000000000000000000000000000000000000000000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
"gasLimit":"0x1C9C380",
"difficulty":"0x400000000",
"mixHash":"0x0000000000000000000000000000000000000000000000000000000000000000",
"coinbase":"0x0000000000000000000000000000000000000000",
"alloc":{
"0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b":{"balance":"0x6d6172697573766477000000"}
},
"number":"0x0",
"gasUsed":"0x0",
"parentHash":"0x0000000000000000000000000000000000000000000000000000000000000000",
"baseFeePerGas":"0x7"
})
}

View File

@ -0,0 +1,12 @@
/// This library provides integration testing between Lighthouse and other execution engines.
///
/// See the `tests/tests.rs` file to run tests.
mod execution_engine;
mod genesis_json;
mod test_rig;
pub use execution_engine::Geth;
pub use test_rig::TestRig;
/// Set to `false` to send logs to the console during tests. Logs are useful when debugging.
const SUPPRESS_LOGS: bool = true;

View File

@ -0,0 +1,363 @@
use crate::execution_engine::{ExecutionEngine, GenericExecutionEngine};
use execution_layer::{ExecutionLayer, PayloadAttributes, PayloadStatusV1Status};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use task_executor::TaskExecutor;
use tokio::time::sleep;
use types::{Address, ChainSpec, EthSpec, Hash256, MainnetEthSpec, Uint256};
const EXECUTION_ENGINE_START_TIMEOUT: Duration = Duration::from_secs(10);
struct ExecutionPair<E> {
/// The Lighthouse `ExecutionLayer` struct, connected to the `execution_engine` via HTTP.
execution_layer: ExecutionLayer,
/// A handle to external EE process, once this is dropped the process will be killed.
#[allow(dead_code)]
execution_engine: ExecutionEngine<E>,
}
/// A rig that holds two EE processes for testing.
///
/// There are two EEs held here so that we can test out-of-order application of payloads, and other
/// edge-cases.
pub struct TestRig<E> {
#[allow(dead_code)]
runtime: Arc<tokio::runtime::Runtime>,
ee_a: ExecutionPair<E>,
ee_b: ExecutionPair<E>,
spec: ChainSpec,
_runtime_shutdown: exit_future::Signal,
}
impl<E: GenericExecutionEngine> TestRig<E> {
pub fn new(generic_engine: E) -> Self {
let log = environment::null_logger().unwrap();
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
);
let (runtime_shutdown, exit) = exit_future::signal();
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx);
let fee_recipient = None;
let ee_a = {
let execution_engine = ExecutionEngine::new(generic_engine.clone());
let urls = vec![execution_engine.http_url()];
let execution_layer =
ExecutionLayer::from_urls(urls, fee_recipient, executor.clone(), log.clone())
.unwrap();
ExecutionPair {
execution_engine,
execution_layer,
}
};
let ee_b = {
let execution_engine = ExecutionEngine::new(generic_engine);
let urls = vec![execution_engine.http_url()];
let execution_layer =
ExecutionLayer::from_urls(urls, fee_recipient, executor, log).unwrap();
ExecutionPair {
execution_engine,
execution_layer,
}
};
let mut spec = MainnetEthSpec::default_spec();
spec.terminal_total_difficulty = Uint256::zero();
Self {
runtime,
ee_a,
ee_b,
spec,
_runtime_shutdown: runtime_shutdown,
}
}
pub fn perform_tests_blocking(&self) {
self.ee_a
.execution_layer
.block_on_generic(|_| async { self.perform_tests().await })
.unwrap()
}
pub async fn wait_until_synced(&self) {
let start_instant = Instant::now();
for pair in [&self.ee_a, &self.ee_b] {
loop {
// Run the routine to check for online nodes.
pair.execution_layer.watchdog_task().await;
if pair.execution_layer.is_synced().await {
break;
} else if start_instant + EXECUTION_ENGINE_START_TIMEOUT > Instant::now() {
sleep(Duration::from_millis(500)).await;
} else {
panic!("timeout waiting for execution engines to come online")
}
}
}
}
pub async fn perform_tests(&self) {
self.wait_until_synced().await;
/*
* Read the terminal block hash from both pairs, check it's equal.
*/
let terminal_pow_block_hash = self
.ee_a
.execution_layer
.get_terminal_pow_block_hash(&self.spec)
.await
.unwrap()
.unwrap();
assert_eq!(
terminal_pow_block_hash,
self.ee_b
.execution_layer
.get_terminal_pow_block_hash(&self.spec)
.await
.unwrap()
.unwrap()
);
/*
* Execution Engine A:
*
* Produce a valid payload atop the terminal block.
*/
let parent_hash = terminal_pow_block_hash;
let timestamp = timestamp_now();
let random = Hash256::zero();
let finalized_block_hash = Hash256::zero();
let proposer_index = 0;
let valid_payload = self
.ee_a
.execution_layer
.get_payload::<MainnetEthSpec>(
parent_hash,
timestamp,
random,
finalized_block_hash,
proposer_index,
)
.await
.unwrap();
/*
* Execution Engine A:
*
* Indicate that the payload is the head of the chain, before submitting a
* `notify_new_payload`.
*/
let head_block_hash = valid_payload.block_hash;
let finalized_block_hash = Hash256::zero();
let payload_attributes = None;
let (status, _) = self
.ee_a
.execution_layer
.notify_forkchoice_updated(head_block_hash, finalized_block_hash, payload_attributes)
.await
.unwrap();
assert_eq!(status, PayloadStatusV1Status::Syncing);
/*
* Execution Engine A:
*
* Provide the valid payload back to the EE again.
*/
let (status, _) = self
.ee_a
.execution_layer
.notify_new_payload(&valid_payload)
.await
.unwrap();
assert_eq!(status, PayloadStatusV1Status::Valid);
/*
* Execution Engine A:
*
* Indicate that the payload is the head of the chain.
*
* Do not provide payload attributes (we'll test that later).
*/
let head_block_hash = valid_payload.block_hash;
let finalized_block_hash = Hash256::zero();
let payload_attributes = None;
let (status, _) = self
.ee_a
.execution_layer
.notify_forkchoice_updated(head_block_hash, finalized_block_hash, payload_attributes)
.await
.unwrap();
assert_eq!(status, PayloadStatusV1Status::Valid);
/*
* Execution Engine A:
*
* Provide an invalidated payload to the EE.
*/
let mut invalid_payload = valid_payload.clone();
invalid_payload.random = Hash256::from_low_u64_be(42);
let (status, _) = self
.ee_a
.execution_layer
.notify_new_payload(&invalid_payload)
.await
.unwrap();
assert!(matches!(
status,
PayloadStatusV1Status::Invalid | PayloadStatusV1Status::InvalidBlockHash
));
/*
* Execution Engine A:
*
* Produce another payload atop the previous one.
*/
let parent_hash = valid_payload.block_hash;
let timestamp = valid_payload.timestamp + 1;
let random = Hash256::zero();
let finalized_block_hash = Hash256::zero();
let proposer_index = 0;
let second_payload = self
.ee_a
.execution_layer
.get_payload::<MainnetEthSpec>(
parent_hash,
timestamp,
random,
finalized_block_hash,
proposer_index,
)
.await
.unwrap();
/*
* Execution Engine A:
*
* Provide the second payload back to the EE again.
*/
let (status, _) = self
.ee_a
.execution_layer
.notify_new_payload(&second_payload)
.await
.unwrap();
assert_eq!(status, PayloadStatusV1Status::Valid);
/*
* Execution Engine A:
*
* Indicate that the payload is the head of the chain, providing payload attributes.
*/
let head_block_hash = valid_payload.block_hash;
let finalized_block_hash = Hash256::zero();
let payload_attributes = Some(PayloadAttributes {
timestamp: second_payload.timestamp + 1,
random: Hash256::zero(),
suggested_fee_recipient: Address::zero(),
});
let (status, _) = self
.ee_a
.execution_layer
.notify_forkchoice_updated(head_block_hash, finalized_block_hash, payload_attributes)
.await
.unwrap();
assert_eq!(status, PayloadStatusV1Status::Valid);
/*
* Execution Engine B:
*
* Provide the second payload, without providing the first.
*/
let (status, _) = self
.ee_b
.execution_layer
.notify_new_payload(&second_payload)
.await
.unwrap();
assert_eq!(status, PayloadStatusV1Status::Syncing);
/*
* Execution Engine B:
*
* Set the second payload as the head, without providing payload attributes.
*/
let head_block_hash = second_payload.block_hash;
let finalized_block_hash = Hash256::zero();
let payload_attributes = None;
let (status, _) = self
.ee_b
.execution_layer
.notify_forkchoice_updated(head_block_hash, finalized_block_hash, payload_attributes)
.await
.unwrap();
assert_eq!(status, PayloadStatusV1Status::Syncing);
/*
* Execution Engine B:
*
* Provide the first payload to the EE.
*/
let (status, _) = self
.ee_b
.execution_layer
.notify_new_payload(&valid_payload)
.await
.unwrap();
assert_eq!(status, PayloadStatusV1Status::Valid);
/*
* Execution Engine B:
*
* Provide the second payload, now the first has been provided.
*/
let (status, _) = self
.ee_b
.execution_layer
.notify_new_payload(&second_payload)
.await
.unwrap();
assert_eq!(status, PayloadStatusV1Status::Valid);
/*
* Execution Engine B:
*
* Set the second payload as the head, without providing payload attributes.
*/
let head_block_hash = second_payload.block_hash;
let finalized_block_hash = Hash256::zero();
let payload_attributes = None;
let (status, _) = self
.ee_b
.execution_layer
.notify_forkchoice_updated(head_block_hash, finalized_block_hash, payload_attributes)
.await
.unwrap();
assert_eq!(status, PayloadStatusV1Status::Valid);
}
}
/// Returns the duration since the unix epoch.
pub fn timestamp_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
.as_secs()
}

View File

@ -0,0 +1,16 @@
#[cfg(not(target_family = "windows"))]
mod not_windows {
use execution_engine_integration::{Geth, TestRig};
#[test]
fn geth() {
TestRig::new(Geth).perform_tests_blocking()
}
}
#[cfg(target_family = "windows")]
mod windows {
#[test]
fn all_tests_skipped_on_windows() {
//
}
}

View File

@ -45,7 +45,7 @@ mod tests {
/// assume it failed to start.
const UPCHECK_TIMEOUT: Duration = Duration::from_secs(20);
/// Set to `true` to send the Web3Signer logs to the console during tests. Logs are useful when
/// Set to `false` to send the Web3Signer logs to the console during tests. Logs are useful when
/// debugging.
const SUPPRESS_WEB3SIGNER_LOGS: bool = true;