Merge pull request #4269 from filecoin-project/steb/simplify-sync

simplify message syncing logic
This commit is contained in:
Łukasz Magiera 2020-10-10 00:00:26 +02:00 committed by GitHub
commit 09bff14d85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -120,12 +120,6 @@ func FetchMessagesByCids(
return err return err
} }
// FIXME: We already sort in `fetchCids`, we are duplicating too much work,
// we don't need to pass the index.
if out[i] != nil {
return fmt.Errorf("received duplicate message")
}
out[i] = msg out[i] = msg
return nil return nil
}) })
@ -149,10 +143,6 @@ func FetchSignedMessagesByCids(
return err return err
} }
if out[i] != nil {
return fmt.Errorf("received duplicate message")
}
out[i] = smsg out[i] = smsg
return nil return nil
}) })
@ -184,24 +174,29 @@ func fetchCids(
return fmt.Errorf("duplicate CIDs in fetchCids input") return fmt.Errorf("duplicate CIDs in fetchCids input")
} }
fetchedBlocks := bserv.GetBlocks(ctx, cids) for block := range bserv.GetBlocks(ctx, cids) {
ix, ok := cidIndex[block.Cid()]
for i := 0; i < len(cids); i++ { if !ok {
select { // Ignore duplicate/unexpected blocks. This shouldn't
case block, ok := <-fetchedBlocks: // happen, but we can be safe.
if !ok { log.Errorw("received duplicate/unexpected block when syncing", "cid", block.Cid())
return fmt.Errorf("failed to fetch all messages") continue
}
ix, ok := cidIndex[block.Cid()]
if !ok {
return fmt.Errorf("received message we didnt ask for")
}
if err := cb(ix, block); err != nil {
return err
}
} }
// Record that we've received the block.
delete(cidIndex, block.Cid())
if err := cb(ix, block); err != nil {
return err
}
}
if len(cidIndex) > 0 {
err := ctx.Err()
if err == nil {
err = fmt.Errorf("failed to fetch %d messages for unknown reasons", len(cidIndex))
}
return err
} }
return nil return nil