Fix bug with block processing in sync

This commit is contained in:
Paul Hauner 2019-03-24 15:18:21 +11:00
parent 5f4f67f46f
commit 1ea9959632
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6

View File

@ -425,54 +425,45 @@ impl SimpleSync {
}
pub fn process_import_queue(&mut self, network: &mut NetworkContext) {
let mut blocks: Vec<(usize, BeaconBlock, PeerId)> = self
.import_queue
.partials
.iter()
.enumerate()
.filter_map(|(i, partial)| {
if let Some(_) = partial.body {
let (block, _root) = partial.clone().complete().expect("Body must be Some");
Some((i, block, partial.sender.clone()))
} else {
None
}
})
.collect();
let mut successful = 0;
let mut invalid = 0;
let mut errored = 0;
if !blocks.is_empty() {
info!(self.log, "Processing blocks"; "count" => blocks.len());
}
// Sort the blocks to be in ascending slot order.
blocks.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap());
let mut keys_to_delete = vec![];
for (key, block, sender) in blocks {
// Loop through all of the complete blocks in the queue.
for (queue_index, block, sender) in self.import_queue.complete_blocks() {
match self.chain.process_block(block) {
Ok(outcome) => {
if outcome.is_invalid() {
warn!(self.log, "Invalid block: {:?}", outcome);
invalid += 1;
warn!(
self.log,
"InvalidBlock";
"sender_peer_id" => format!("{:?}", sender),
"reason" => format!("{:?}", outcome),
);
network.disconnect(sender);
keys_to_delete.push(key)
} else {
// TODO: don't delete if was not invalid but not successfully processed.
keys_to_delete.push(key)
}
// If this results to true, the item will be removed from the queue.
if outcome.sucessfully_processed() {
successful += 1;
self.import_queue.partials.remove(queue_index);
}
}
Err(e) => {
error!(self.log, "Error during block processing"; "error" => format!("{:?}", e))
errored += 1;
error!(self.log, "BlockProcessingError"; "error" => format!("{:?}", e));
}
}
}
if !keys_to_delete.is_empty() {
info!(self.log, "Processed {} blocks", keys_to_delete.len());
for key in keys_to_delete {
self.import_queue.partials.remove(key);
}
}
info!(
self.log,
"ProcessBlocks";
"invalid" => invalid,
"successful" => successful,
"errored" => errored,
)
}
fn request_block_roots(
@ -557,6 +548,35 @@ impl ImportQueue {
}
}
/// Completes all possible partials into `BeaconBlock` and returns them, sorted by slot number.
/// Does not delete the partials from the queue, this must be done manually.
///
/// Returns `(queue_index, block, sender)`:
///
/// - `queue_index`: used to remove the entry if it is successfully processed.
/// - `block`: the completed block.
/// - `sender`: the `PeerId` the provided the `BeaconBlockBody` which completed the partial.
pub fn complete_blocks(&self) -> Vec<(usize, BeaconBlock, PeerId)> {
let mut completable: Vec<(usize, &PartialBeaconBlock)> = self
.partials
.iter()
.enumerate()
.filter(|(_i, partial)| partial.completable())
.collect();
// Sort the completable partials to be in ascending slot order.
completable.sort_unstable_by(|a, b| a.1.header.slot.partial_cmp(&b.1.header.slot).unwrap());
completable
.iter()
.map(|(i, partial)| {
let (block, _root, sender) =
(*partial).clone().complete().expect("Body must be Some");
(*i, block, sender)
})
.collect()
}
/// Flushes all stale entries from the queue.
///
/// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the
@ -696,8 +716,16 @@ pub struct PartialBeaconBlock {
}
impl PartialBeaconBlock {
pub fn completable(&self) -> bool {
self.body.is_some()
}
/// Given a `body`, consumes `self` and returns a complete `BeaconBlock` along with its root.
pub fn complete(self) -> Option<(BeaconBlock, Hash256)> {
Some((self.header.into_block(self.body?), self.block_root))
pub fn complete(self) -> Option<(BeaconBlock, Hash256, PeerId)> {
Some((
self.header.into_block(self.body?),
self.block_root,
self.sender,
))
}
}