Initial work to remove engines fallback from the execution_layer crate (#3257)

## Issue Addressed

Part of #3160 

## Proposed Changes
Use only the first url given in the execution engine, if more than one is provided log it.
This change only moves having multiple engines to one. The amount of code cleanup that can and should be done forward is not small and would interfere with ongoing PRs. I'm keeping the changes intentionally very very minimal.

## Additional Info

Future works:
- In [ `EngineError` ](9c429d0764/beacon_node/execution_layer/src/engines.rs (L173-L177)) the id is not needed since it now has no meaning.
- the [ `first_success_without_retry` ](9c429d0764/beacon_node/execution_layer/src/engines.rs (L348-L351)) function can return a single error.
- the [`first_success`](9c429d0764/beacon_node/execution_layer/src/engines.rs (L324)) function can return a single error.
- After the redundancy is removed for the builders, we can probably make the [ `EngineErrors` ](9c429d0764/beacon_node/execution_layer/src/lib.rs (L69)) carry a single error.
- Merge the [`Engines`](9c429d0764/beacon_node/execution_layer/src/engines.rs (L161-L165)) struct and [`Engine` ](9c429d0764/beacon_node/execution_layer/src/engines.rs (L62-L67))
- Fix the associated configurations and cli params. Not sure if both are done in https://github.com/sigp/lighthouse/pull/3214

In general I think those changes can be done incrementally and in individual pull requests.
This commit is contained in:
Divma 2022-06-22 14:27:16 +00:00
parent 8faaa35b58
commit 2063c0fa0d
2 changed files with 201 additions and 285 deletions

View File

@ -156,10 +156,11 @@ impl Builder for Engine<BuilderApi> {
}
}
/// Holds multiple execution engines and provides functionality for managing them in a fallback
/// manner.
// This structure used to hold multiple execution engines managed in a fallback manner. This
// functionality has been removed following https://github.com/sigp/lighthouse/issues/3118 and this
// struct will likely be removed in the future.
pub struct Engines {
pub engines: Vec<Engine<EngineApi>>,
pub engine: Engine<EngineApi>,
pub latest_forkchoice_state: RwLock<Option<ForkChoiceState>>,
pub log: Logger,
}
@ -185,7 +186,7 @@ impl Engines {
*self.latest_forkchoice_state.write().await = Some(state);
}
async fn send_latest_forkchoice_state(&self, engine: &Engine<EngineApi>) {
async fn send_latest_forkchoice_state(&self) {
let latest_forkchoice_state = self.get_latest_forkchoice_state().await;
if let Some(forkchoice_state) = latest_forkchoice_state {
@ -194,7 +195,7 @@ impl Engines {
self.log,
"No need to call forkchoiceUpdated";
"msg" => "head does not have execution enabled",
"id" => &engine.id,
"id" => &self.engine.id,
);
return;
}
@ -203,12 +204,13 @@ impl Engines {
self.log,
"Issuing forkchoiceUpdated";
"forkchoice_state" => ?forkchoice_state,
"id" => &engine.id,
"id" => &self.engine.id,
);
// For simplicity, payload attributes are never included in this call. It may be
// reasonable to include them in the future.
if let Err(e) = engine
if let Err(e) = self
.engine
.api
.forkchoice_updated_v1(forkchoice_state, None)
.await
@ -217,98 +219,77 @@ impl Engines {
self.log,
"Failed to issue latest head to engine";
"error" => ?e,
"id" => &engine.id,
"id" => &self.engine.id,
);
}
} else {
debug!(
self.log,
"No head, not sending to engine";
"id" => &engine.id,
"id" => &self.engine.id,
);
}
}
/// Returns `true` if there is at least one engine with a "synced" status.
pub async fn any_synced(&self) -> bool {
for engine in &self.engines {
if *engine.state.read().await == EngineState::Synced {
return true;
}
}
false
/// Returns `true` if the engine has a "synced" status.
pub async fn is_synced(&self) -> bool {
*self.engine.state.read().await == EngineState::Synced
}
/// Run the `EngineApi::upcheck` function on all nodes which are currently offline.
///
/// This can be used to try and recover any offline nodes.
/// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This
/// might be used to recover the node if offline.
pub async fn upcheck_not_synced(&self, logging: Logging) {
let upcheck_futures = self.engines.iter().map(|engine| async move {
let mut state_lock = engine.state.write().await;
if *state_lock != EngineState::Synced {
match engine.api.upcheck().await {
Ok(()) => {
if logging.is_enabled() {
info!(
self.log,
"Execution engine online";
"id" => &engine.id
);
}
// Send the node our latest forkchoice_state.
self.send_latest_forkchoice_state(engine).await;
*state_lock = EngineState::Synced
let mut state_lock = self.engine.state.write().await;
if *state_lock != EngineState::Synced {
match self.engine.api.upcheck().await {
Ok(()) => {
if logging.is_enabled() {
info!(
self.log,
"Execution engine online";
);
}
Err(EngineApiError::IsSyncing) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine syncing";
"id" => &engine.id
)
}
// Send the node our latest forkchoice_state.
self.send_latest_forkchoice_state().await;
// Send the node our latest forkchoice_state, it may assist with syncing.
self.send_latest_forkchoice_state(engine).await;
*state_lock = EngineState::Syncing
*state_lock = EngineState::Synced
}
Err(EngineApiError::IsSyncing) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine syncing";
)
}
Err(EngineApiError::Auth(err)) => {
if logging.is_enabled() {
warn!(
self.log,
"Failed jwt authorization";
"error" => ?err,
"id" => &engine.id
);
}
*state_lock = EngineState::AuthFailed
// Send the node our latest forkchoice_state, it may assist with syncing.
self.send_latest_forkchoice_state().await;
*state_lock = EngineState::Syncing
}
Err(EngineApiError::Auth(err)) => {
if logging.is_enabled() {
warn!(
self.log,
"Failed jwt authorization";
"error" => ?err,
);
}
Err(e) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine offline";
"error" => ?e,
"id" => &engine.id
)
}
*state_lock = EngineState::AuthFailed
}
Err(e) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine offline";
"error" => ?e,
)
}
}
}
*state_lock
});
}
let num_synced = join_all(upcheck_futures)
.await
.into_iter()
.filter(|state: &EngineState| *state == EngineState::Synced)
.count();
if num_synced == 0 && logging.is_enabled() {
if *state_lock != EngineState::Synced && logging.is_enabled() {
crit!(
self.log,
"No synced execution engines";
@ -355,111 +336,89 @@ impl Engines {
{
let mut errors = vec![];
for engine in &self.engines {
let (engine_synced, engine_auth_failed) = {
let state = engine.state.read().await;
(
*state == EngineState::Synced,
*state == EngineState::AuthFailed,
)
};
if engine_synced {
match func(engine).await {
Ok(result) => return Ok(result),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &engine.id
);
*engine.state.write().await = EngineState::Offline;
errors.push(EngineError::Api {
id: engine.id.clone(),
error,
})
}
let (engine_synced, engine_auth_failed) = {
let state = self.engine.state.read().await;
(
*state == EngineState::Synced,
*state == EngineState::AuthFailed,
)
};
if engine_synced {
match func(&self.engine).await {
Ok(result) => return Ok(result),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &&self.engine.id
);
*self.engine.state.write().await = EngineState::Offline;
errors.push(EngineError::Api {
id: self.engine.id.clone(),
error,
})
}
} else if engine_auth_failed {
errors.push(EngineError::Auth {
id: engine.id.clone(),
})
} else {
errors.push(EngineError::Offline {
id: engine.id.clone(),
})
}
} else if engine_auth_failed {
errors.push(EngineError::Auth {
id: self.engine.id.clone(),
})
} else {
errors.push(EngineError::Offline {
id: self.engine.id.clone(),
})
}
Err(errors)
}
/// Runs `func` on all nodes concurrently, returning all results. Any nodes that are offline
/// will be ignored, however all synced or unsynced nodes will receive the broadcast.
/// Runs `func` on the node.
///
/// This function might try to run `func` twice. If all nodes return an error on the first time
/// it runs, it will try to upcheck all offline nodes and then run the function again.
pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Vec<Result<H, EngineError>>
pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
let first_results = self.broadcast_without_retry(func).await;
let mut any_offline = false;
for result in &first_results {
match result {
Ok(_) => return first_results,
Err(EngineError::Offline { .. }) => any_offline = true,
_ => (),
match self.broadcast_without_retry(func).await {
Err(EngineError::Offline { .. }) => {
self.upcheck_not_synced(Logging::Enabled).await;
self.broadcast_without_retry(func).await
}
}
if any_offline {
self.upcheck_not_synced(Logging::Enabled).await;
self.broadcast_without_retry(func).await
} else {
first_results
other => other,
}
}
/// Runs `func` on all nodes concurrently, returning all results.
pub async fn broadcast_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Vec<Result<H, EngineError>>
/// Runs `func` on the node if it's last state is not offline.
pub async fn broadcast_without_retry<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let func = &func;
let futures = self.engines.iter().map(|engine| async move {
let is_offline = *engine.state.read().await == EngineState::Offline;
if !is_offline {
match func(engine).await {
Ok(res) => Ok(res),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &engine.id
);
*engine.state.write().await = EngineState::Offline;
Err(EngineError::Api {
id: engine.id.clone(),
error,
})
}
if *self.engine.state.read().await == EngineState::Offline {
Err(EngineError::Offline {
id: self.engine.id.clone(),
})
} else {
match func(&self.engine).await {
Ok(res) => Ok(res),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
);
*self.engine.state.write().await = EngineState::Offline;
Err(EngineError::Api {
id: self.engine.id.clone(),
error,
})
}
} else {
Err(EngineError::Offline {
id: engine.id.clone(),
})
}
});
join_all(futures).await
}
}
}

View File

@ -158,72 +158,60 @@ impl ExecutionLayer {
let Config {
execution_endpoints: urls,
builder_endpoints: builder_urls,
mut secret_files,
secret_files,
suggested_fee_recipient,
jwt_id,
jwt_version,
default_datadir,
} = config;
if urls.is_empty() {
return Err(Error::NoEngines);
if urls.len() > 1 {
warn!(log, "Only the first execution engine url will be used");
}
let execution_url = urls.into_iter().next().ok_or(Error::NoEngines)?;
// Extend the jwt secret files with the default jwt secret path if not provided via cli.
// This ensures that we have a jwt secret for every EL.
secret_files.extend(vec![
default_datadir.join(DEFAULT_JWT_FILE);
urls.len().saturating_sub(secret_files.len())
]);
let secrets: Vec<(JwtKey, PathBuf)> = secret_files
.iter()
.map(|p| {
// Read secret from file if it already exists
if p.exists() {
std::fs::read_to_string(p)
.map_err(|e| {
format!("Failed to read JWT secret file {:?}, error: {:?}", p, e)
})
.and_then(|ref s| {
let secret = JwtKey::from_slice(
&hex::decode(strip_prefix(s.trim_end()))
.map_err(|e| format!("Invalid hex string: {:?}", e))?,
)?;
Ok((secret, p.to_path_buf()))
})
} else {
// Create a new file and write a randomly generated secret to it if file does not exist
std::fs::File::options()
.write(true)
.create_new(true)
.open(p)
.map_err(|e| {
format!("Failed to open JWT secret file {:?}, error: {:?}", p, e)
})
.and_then(|mut f| {
let secret = auth::JwtKey::random();
f.write_all(secret.hex_string().as_bytes()).map_err(|e| {
format!("Failed to write to JWT secret file: {:?}", e)
})?;
Ok((secret, p.to_path_buf()))
})
}
})
.collect::<Result<_, _>>()
.map_err(Error::InvalidJWTSecret)?;
let engines: Vec<Engine<EngineApi>> = urls
// Use the default jwt secret path if not provided via cli.
let secret_file = secret_files
.into_iter()
.zip(secrets.into_iter())
.map(|(url, (secret, path))| {
let id = url.to_string();
let auth = Auth::new(secret, jwt_id.clone(), jwt_version.clone());
debug!(log, "Loaded execution endpoint"; "endpoint" => %id, "jwt_path" => ?path);
let api = HttpJsonRpc::<EngineApi>::new_with_auth(url, auth)?;
Ok(Engine::<EngineApi>::new(id, api))
})
.collect::<Result<_, ApiError>>()?;
.next()
.unwrap_or_else(|| default_datadir.join(DEFAULT_JWT_FILE));
let jwt_key = if secret_file.exists() {
// Read secret from file if it already exists
std::fs::read_to_string(&secret_file)
.map_err(|e| format!("Failed to read JWT secret file. Error: {:?}", e))
.and_then(|ref s| {
let secret = JwtKey::from_slice(
&hex::decode(strip_prefix(s.trim_end()))
.map_err(|e| format!("Invalid hex string: {:?}", e))?,
)?;
Ok(secret)
})
.map_err(Error::InvalidJWTSecret)
} else {
// Create a new file and write a randomly generated secret to it if file does not exist
std::fs::File::options()
.write(true)
.create_new(true)
.open(&secret_file)
.map_err(|e| format!("Failed to open JWT secret file. Error: {:?}", e))
.and_then(|mut f| {
let secret = auth::JwtKey::random();
f.write_all(secret.hex_string().as_bytes())
.map_err(|e| format!("Failed to write to JWT secret file: {:?}", e))?;
Ok(secret)
})
.map_err(Error::InvalidJWTSecret)
}?;
let engine: Engine<EngineApi> = {
let id = execution_url.to_string();
let auth = Auth::new(jwt_key, jwt_id, jwt_version);
debug!(log, "Loaded execution endpoint"; "endpoint" => %id, "jwt_path" => ?secret_file.as_path());
let api = HttpJsonRpc::<EngineApi>::new_with_auth(execution_url, auth)
.map_err(Error::ApiError)?;
Engine::<EngineApi>::new(id, api)
};
let builders: Vec<Engine<BuilderApi>> = builder_urls
.into_iter()
@ -236,7 +224,7 @@ impl ExecutionLayer {
let inner = Inner {
engines: Engines {
engines,
engine,
latest_forkchoice_state: <_>::default(),
log: log.clone(),
},
@ -455,7 +443,7 @@ impl ExecutionLayer {
/// Returns `true` if there is at least one synced and reachable engine.
pub async fn is_synced(&self) -> bool {
self.engines().any_synced().await
self.engines().is_synced().await
}
/// Updates the proposer preparation data provided by validators
@ -750,7 +738,7 @@ impl ExecutionLayer {
process_multiple_payload_statuses(
execution_payload.block_hash,
broadcast_results.into_iter(),
Some(broadcast_results).into_iter(),
self.log(),
)
}
@ -903,7 +891,7 @@ impl ExecutionLayer {
};
process_multiple_payload_statuses(
head_block_hash,
broadcast_results
Some(broadcast_results)
.into_iter()
.chain(builder_broadcast_results.into_iter())
.map(|result| result.map(|response| response.payload_status)),
@ -918,49 +906,49 @@ impl ExecutionLayer {
terminal_block_number: 0,
};
let broadcast_results = self
let broadcast_result = self
.engines()
.broadcast(|engine| engine.api.exchange_transition_configuration_v1(local))
.await;
let mut errors = vec![];
for (i, result) in broadcast_results.into_iter().enumerate() {
match result {
Ok(remote) => {
if local.terminal_total_difficulty != remote.terminal_total_difficulty
|| local.terminal_block_hash != remote.terminal_block_hash
{
error!(
self.log(),
"Execution client config mismatch";
"msg" => "ensure lighthouse and the execution client are up-to-date and \
configured consistently",
"execution_endpoint" => i,
"remote" => ?remote,
"local" => ?local,
);
errors.push(EngineError::Api {
id: i.to_string(),
error: ApiError::TransitionConfigurationMismatch,
});
} else {
debug!(
self.log(),
"Execution client config is OK";
"execution_endpoint" => i
);
}
}
Err(e) => {
// Having no fallbacks, the id of the used node is 0
let i = 0usize;
match broadcast_result {
Ok(remote) => {
if local.terminal_total_difficulty != remote.terminal_total_difficulty
|| local.terminal_block_hash != remote.terminal_block_hash
{
error!(
self.log(),
"Unable to get transition config";
"error" => ?e,
"Execution client config mismatch";
"msg" => "ensure lighthouse and the execution client are up-to-date and \
configured consistently",
"execution_endpoint" => i,
"remote" => ?remote,
"local" => ?local,
);
errors.push(EngineError::Api {
id: i.to_string(),
error: ApiError::TransitionConfigurationMismatch,
});
} else {
debug!(
self.log(),
"Execution client config is OK";
"execution_endpoint" => i
);
errors.push(e);
}
}
Err(e) => {
error!(
self.log(),
"Unable to get transition config";
"error" => ?e,
"execution_endpoint" => i,
);
errors.push(e);
}
}
if errors.is_empty() {
@ -1102,8 +1090,7 @@ impl ExecutionLayer {
&[metrics::IS_VALID_TERMINAL_POW_BLOCK_HASH],
);
let broadcast_results = self
.engines()
self.engines()
.broadcast(|engine| async move {
if let Some(pow_block) = self.get_pow_block(engine, block_hash).await? {
if let Some(pow_parent) =
@ -1116,38 +1103,8 @@ impl ExecutionLayer {
}
Ok(None)
})
.await;
let mut errors = vec![];
let mut terminal = 0;
let mut not_terminal = 0;
let mut block_missing = 0;
for result in broadcast_results {
match result {
Ok(Some(true)) => terminal += 1,
Ok(Some(false)) => not_terminal += 1,
Ok(None) => block_missing += 1,
Err(e) => errors.push(e),
}
}
if terminal > 0 && not_terminal > 0 {
crit!(
self.log(),
"Consensus failure between execution nodes";
"method" => "is_valid_terminal_pow_block_hash"
);
}
if terminal > 0 {
Ok(Some(true))
} else if not_terminal > 0 {
Ok(Some(false))
} else if block_missing > 0 {
Ok(None)
} else {
Err(Error::EngineErrors(errors))
}
.await
.map_err(|e| Error::EngineErrors(vec![e]))
}
/// This function should remain internal.