fix(eth1/service): use self instead of Service

This commit is contained in:
Georgios Konstantopoulos 2020-06-21 21:50:33 +03:00
parent 710409c2ba
commit 81a89fb773
No known key found for this signature in database
GPG Key ID: FA607837CD26EDBC

View File

@ -246,32 +246,34 @@ impl Service {
///
/// Emits logs for debugging and errors.
pub async fn update(
service: Self,
&self,
) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> {
let update_deposit_cache = async {
let outcome = Service::update_deposit_cache(service.clone())
let outcome = self
.update_deposit_cache()
.await
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?;
trace!(
service.log,
self.log,
"Updated eth1 deposit cache";
"cached_deposits" => service.inner.deposit_cache.read().cache.len(),
"cached_deposits" => self.inner.deposit_cache.read().cache.len(),
"logs_imported" => outcome.logs_imported,
"last_processed_eth1_block" => service.inner.deposit_cache.read().last_processed_block,
"last_processed_eth1_block" => self.inner.deposit_cache.read().last_processed_block,
);
Ok(outcome)
};
let update_block_cache = async {
let outcome = Service::update_block_cache(service.clone())
let outcome = self
.update_block_cache()
.await
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?;
trace!(
service.log,
self.log,
"Updated eth1 block cache";
"cached_blocks" => service.inner.block_cache.read().len(),
"cached_blocks" => self.inner.block_cache.read().len(),
"blocks_imported" => outcome.blocks_imported,
"head_block" => outcome.head_block_number,
);
@ -290,33 +292,31 @@ impl Service {
/// - Err(_) if there is an error.
///
/// Emits logs for debugging and errors.
pub fn auto_update(service: Self, handle: environment::TaskExecutor) {
let update_interval = Duration::from_millis(service.config().auto_update_interval_millis);
pub fn auto_update(self, handle: environment::TaskExecutor) {
let update_interval = Duration::from_millis(self.config().auto_update_interval_millis);
let mut interval = interval_at(Instant::now(), update_interval);
let update_future = async move {
while interval.next().await.is_some() {
Service::do_update(service.clone(), update_interval)
.await
.ok();
self.do_update(update_interval).await.ok();
}
};
handle.spawn(update_future, "eth1");
}
async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> {
let update_result = Service::update(service.clone()).await;
async fn do_update(&self, update_interval: Duration) -> Result<(), ()> {
let update_result = self.update().await;
match update_result {
Err(e) => error!(
service.log,
self.log,
"Failed to update eth1 cache";
"retry_millis" => update_interval.as_millis(),
"error" => e,
),
Ok((deposit, block)) => debug!(
service.log,
self.log,
"Updated eth1 cache";
"retry_millis" => update_interval.as_millis(),
"blocks" => format!("{:?}", block),
@ -338,23 +338,23 @@ impl Service {
/// - Err(_) if there is an error.
///
/// Emits logs for debugging and errors.
pub async fn update_deposit_cache(service: Self) -> Result<DepositCacheUpdateOutcome, Error> {
let endpoint = service.config().endpoint.clone();
let follow_distance = service.config().follow_distance;
let deposit_contract_address = service.config().deposit_contract_address.clone();
pub async fn update_deposit_cache(&self) -> Result<DepositCacheUpdateOutcome, Error> {
let endpoint = self.config().endpoint.clone();
let follow_distance = self.config().follow_distance;
let deposit_contract_address = self.config().deposit_contract_address.clone();
let blocks_per_log_query = service.config().blocks_per_log_query;
let max_log_requests_per_update = service
let blocks_per_log_query = self.config().blocks_per_log_query;
let max_log_requests_per_update = self
.config()
.max_log_requests_per_update
.unwrap_or_else(usize::max_value);
let next_required_block = service
let next_required_block = self
.deposits()
.read()
.last_processed_block
.map(|n| n + 1)
.unwrap_or_else(|| service.config().deposit_contract_deploy_block);
.unwrap_or_else(|| self.config().deposit_contract_deploy_block);
let range = get_new_block_numbers(&endpoint, next_required_block, follow_distance).await?;
@ -398,7 +398,7 @@ impl Service {
let mut logs_imported = 0;
for (block_range, log_chunk) in logs.iter() {
let mut cache = service.deposits().write();
let mut cache = self.deposits().write();
log_chunk
.into_iter()
.map(|raw_log| {
@ -442,18 +442,18 @@ impl Service {
if logs_imported > 0 {
info!(
service.log,
self.log,
"Imported deposit log(s)";
"latest_block" => service.inner.deposit_cache.read().cache.latest_block_number(),
"total" => service.deposit_cache_len(),
"latest_block" => self.inner.deposit_cache.read().cache.latest_block_number(),
"total" => self.deposit_cache_len(),
"new" => logs_imported
);
} else {
debug!(
service.log,
self.log,
"No new deposits found";
"latest_block" => service.inner.deposit_cache.read().cache.latest_block_number(),
"total_deposits" => service.deposit_cache_len(),
"latest_block" => self.inner.deposit_cache.read().cache.latest_block_number(),
"total_deposits" => self.deposit_cache_len(),
);
}
@ -471,23 +471,23 @@ impl Service {
/// - Err(_) if there is an error.
///
/// Emits logs for debugging and errors.
pub async fn update_block_cache(service: Self) -> Result<BlockCacheUpdateOutcome, Error> {
let block_cache_truncation = service.config().block_cache_truncation;
let max_blocks_per_update = service
pub async fn update_block_cache(&self) -> Result<BlockCacheUpdateOutcome, Error> {
let block_cache_truncation = self.config().block_cache_truncation;
let max_blocks_per_update = self
.config()
.max_blocks_per_update
.unwrap_or_else(usize::max_value);
let next_required_block = service
let next_required_block = self
.inner
.block_cache
.read()
.highest_block_number()
.map(|n| n + 1)
.unwrap_or_else(|| service.config().lowest_cached_block_number);
.unwrap_or_else(|| self.config().lowest_cached_block_number);
let endpoint = service.config().endpoint.clone();
let follow_distance = service.config().follow_distance;
let endpoint = self.config().endpoint.clone();
let follow_distance = self.config().follow_distance;
let range = get_new_block_numbers(&endpoint, next_required_block, follow_distance).await?;
// Map the range of required blocks into a Vec.
@ -509,7 +509,7 @@ impl Service {
// If the range of required blocks is larger than `max_size`, drop all
// existing blocks and download `max_size` count of blocks.
let first_block = range.end() - max_size;
(*service.inner.block_cache.write()) = BlockCache::default();
(*self.inner.block_cache.write()) = BlockCache::default();
(first_block..=*range.end()).collect::<Vec<u64>>()
} else {
range.collect::<Vec<u64>>()
@ -520,7 +520,7 @@ impl Service {
};
// Download the range of blocks and sequentially import them into the cache.
// Last processed block in deposit cache
let latest_in_cache = service
let latest_in_cache = self
.inner
.deposit_cache
.read()
@ -540,7 +540,7 @@ impl Service {
|mut block_numbers| async {
match block_numbers.next() {
Some(block_number) => {
match download_eth1_block(service.inner.clone(), block_number).await {
match download_eth1_block(self.inner.clone(), block_number).await {
Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))),
Err(e) => Err(e),
}
@ -554,8 +554,7 @@ impl Service {
let mut blocks_imported = 0;
for eth1_block in eth1_blocks {
service
.inner
self.inner
.block_cache
.write()
.insert_root_or_child(eth1_block)
@ -563,12 +562,11 @@ impl Service {
metrics::set_gauge(
&metrics::BLOCK_CACHE_LEN,
service.inner.block_cache.read().len() as i64,
self.inner.block_cache.read().len() as i64,
);
metrics::set_gauge(
&metrics::LATEST_CACHED_BLOCK_TIMESTAMP,
service
.inner
self.inner
.block_cache
.read()
.latest_block_timestamp()
@ -579,14 +577,14 @@ impl Service {
}
// Prune the block cache, preventing it from growing too large.
service.inner.prune_blocks();
self.inner.prune_blocks();
metrics::set_gauge(
&metrics::BLOCK_CACHE_LEN,
service.inner.block_cache.read().len() as i64,
self.inner.block_cache.read().len() as i64,
);
let block_cache = service.inner.block_cache.read();
let block_cache = self.inner.block_cache.read();
let latest_block_mins = block_cache
.latest_block_timestamp()
.and_then(|timestamp| {
@ -600,7 +598,7 @@ impl Service {
if blocks_imported > 0 {
debug!(
service.log,
self.log,
"Imported eth1 block(s)";
"latest_block_age" => latest_block_mins,
"latest_block" => block_cache.highest_block_number(),
@ -609,7 +607,7 @@ impl Service {
);
} else {
debug!(
service.log,
self.log,
"No new eth1 blocks imported";
"latest_block" => block_cache.highest_block_number(),
"cached_blocks" => block_cache.len(),
@ -618,7 +616,7 @@ impl Service {
Ok(BlockCacheUpdateOutcome {
blocks_imported,
head_block_number: service.inner.block_cache.read().highest_block_number(),
head_block_number: self.inner.block_cache.read().highest_block_number(),
})
}
}