From 105aa40007f92bfd1b2e057731da76381d2ed6a2 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 9 Oct 2020 11:06:49 -0700 Subject: [PATCH] simplify message syncing logic 1. Allow duplicate blocks from bitswap. This shouldn't happen, but there's no reason to bail (just log loudly). 2. Simplify logic to very explicitly check to make sure we're fetching every block. --- chain/sub/incoming.go | 49 +++++++++++++++++++------------------------ 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index d51c481d1..68ee5e20c 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -120,12 +120,6 @@ func FetchMessagesByCids( return err } - // FIXME: We already sort in `fetchCids`, we are duplicating too much work, - // we don't need to pass the index. - if out[i] != nil { - return fmt.Errorf("received duplicate message") - } - out[i] = msg return nil }) @@ -149,10 +143,6 @@ func FetchSignedMessagesByCids( return err } - if out[i] != nil { - return fmt.Errorf("received duplicate message") - } - out[i] = smsg return nil }) @@ -184,24 +174,29 @@ func fetchCids( return fmt.Errorf("duplicate CIDs in fetchCids input") } - fetchedBlocks := bserv.GetBlocks(ctx, cids) - - for i := 0; i < len(cids); i++ { - select { - case block, ok := <-fetchedBlocks: - if !ok { - return fmt.Errorf("failed to fetch all messages") - } - - ix, ok := cidIndex[block.Cid()] - if !ok { - return fmt.Errorf("received message we didnt ask for") - } - - if err := cb(ix, block); err != nil { - return err - } + for block := range bserv.GetBlocks(ctx, cids) { + ix, ok := cidIndex[block.Cid()] + if !ok { + // Ignore duplicate/unexpected blocks. This shouldn't + // happen, but we can be safe. + log.Errorw("received duplicate/unexpected block when syncing", "cid", block.Cid()) + continue } + + // Record that we've received the block. + delete(cidIndex, block.Cid()) + + if err := cb(ix, block); err != nil { + return err + } + } + + if len(cidIndex) > 0 { + err := ctx.Err() + if err == nil { + err = fmt.Errorf("failed to fetch %d messages for unknown reasons", len(cidIndex)) + } + return err } return nil