simplify message syncing logic
1. Allow duplicate blocks from bitswap. This shouldn't happen, but there's no reason to bail (just log loudly). 2. Simplify logic to very explicitly check to make sure we're fetching every block.
This commit is contained in:
parent
110f033c83
commit
105aa40007
@ -120,12 +120,6 @@ func FetchMessagesByCids(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: We already sort in `fetchCids`, we are duplicating too much work,
|
|
||||||
// we don't need to pass the index.
|
|
||||||
if out[i] != nil {
|
|
||||||
return fmt.Errorf("received duplicate message")
|
|
||||||
}
|
|
||||||
|
|
||||||
out[i] = msg
|
out[i] = msg
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@ -149,10 +143,6 @@ func FetchSignedMessagesByCids(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if out[i] != nil {
|
|
||||||
return fmt.Errorf("received duplicate message")
|
|
||||||
}
|
|
||||||
|
|
||||||
out[i] = smsg
|
out[i] = smsg
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@ -184,24 +174,29 @@ func fetchCids(
|
|||||||
return fmt.Errorf("duplicate CIDs in fetchCids input")
|
return fmt.Errorf("duplicate CIDs in fetchCids input")
|
||||||
}
|
}
|
||||||
|
|
||||||
fetchedBlocks := bserv.GetBlocks(ctx, cids)
|
for block := range bserv.GetBlocks(ctx, cids) {
|
||||||
|
|
||||||
for i := 0; i < len(cids); i++ {
|
|
||||||
select {
|
|
||||||
case block, ok := <-fetchedBlocks:
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("failed to fetch all messages")
|
|
||||||
}
|
|
||||||
|
|
||||||
ix, ok := cidIndex[block.Cid()]
|
ix, ok := cidIndex[block.Cid()]
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("received message we didnt ask for")
|
// Ignore duplicate/unexpected blocks. This shouldn't
|
||||||
|
// happen, but we can be safe.
|
||||||
|
log.Errorw("received duplicate/unexpected block when syncing", "cid", block.Cid())
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Record that we've received the block.
|
||||||
|
delete(cidIndex, block.Cid())
|
||||||
|
|
||||||
if err := cb(ix, block); err != nil {
|
if err := cb(ix, block); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(cidIndex) > 0 {
|
||||||
|
err := ctx.Err()
|
||||||
|
if err == nil {
|
||||||
|
err = fmt.Errorf("failed to fetch %d messages for unknown reasons", len(cidIndex))
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
Reference in New Issue
Block a user