diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 07b3343d2..d51c481d1 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -172,25 +172,24 @@ func fetchCids( cids []cid.Cid, cb func(int, blocks.Block) error, ) error { - fetchedBlocks := bserv.GetBlocks(ctx, cids) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() cidIndex := make(map[cid.Cid]int) for i, c := range cids { cidIndex[c] = i } + if len(cids) != len(cidIndex) { + return fmt.Errorf("duplicate CIDs in fetchCids input") + } + + fetchedBlocks := bserv.GetBlocks(ctx, cids) for i := 0; i < len(cids); i++ { select { case block, ok := <-fetchedBlocks: if !ok { - // Closed channel, no more blocks fetched, check if we have all - // of the CIDs requested. - // FIXME: Review this check. We don't call the callback on the - // last index? - if i == len(cids)-1 { - break - } - return fmt.Errorf("failed to fetch all messages") } diff --git a/chain/sub/incoming_test.go b/chain/sub/incoming_test.go new file mode 100644 index 000000000..215439209 --- /dev/null +++ b/chain/sub/incoming_test.go @@ -0,0 +1,63 @@ +package sub + +import ( + "context" + "testing" + + address "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/types" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" +) + +type getter struct { + msgs []*types.Message +} + +func (g *getter) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { panic("NYI") } + +func (g *getter) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { + ch := make(chan blocks.Block, len(g.msgs)) + for _, m := range g.msgs { + by, err := m.Serialize() + if err != nil { + panic(err) + } + b, err := blocks.NewBlockWithCid(by, m.Cid()) + if err != nil { + panic(err) + } + ch <- b + } + close(ch) + return ch +} + +func TestFetchCidsWithDedup(t *testing.T) { + msgs := []*types.Message{} + for i := 0; i < 10; i++ { + msgs = append(msgs, &types.Message{ + From: address.TestAddress, + To: address.TestAddress, + + Nonce: uint64(i), + }) + } + cids := []cid.Cid{} + for _, m := range msgs { + cids = append(cids, m.Cid()) + } + g := &getter{msgs} + + // the cids have a duplicate + res, err := FetchMessagesByCids(context.TODO(), g, append(cids, cids[0])) + + t.Logf("err: %+v", err) + t.Logf("res: %+v", res) + if err == nil { + t.Errorf("there should be an error") + } + if err == nil && (res[0] == nil || res[len(res)-1] == nil) { + t.Fatalf("there is a nil message: first %p, last %p", res[0], res[len(res)-1]) + } +} diff --git a/chain/sync.go b/chain/sync.go index 240d1edef..c280e3a40 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -217,6 +217,12 @@ func (syncer *Syncer) Stop() { // This should be called when connecting to new peers, and additionally // when receiving new blocks from the network func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool { + defer func() { + if err := recover(); err != nil { + log.Errorf("panic in InformNewHead: ", err) + } + }() + ctx := context.Background() if fts == nil { log.Errorf("got nil tipset in InformNewHead") @@ -1281,9 +1287,11 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet blockSet := []*types.TipSet{incoming} + // Parent of the new (possibly better) tipset that we need to fetch next. at := incoming.Parents() - // we want to sync all the blocks until the height above the block we have + // we want to sync all the blocks until the height above our + // best tipset so far untilHeight := known.Height() + 1 ss.SetHeight(blockSet[len(blockSet)-1].Height()) @@ -1377,13 +1385,17 @@ loop: } base := blockSet[len(blockSet)-1] - if base.Parents() == known.Parents() { - // common case: receiving a block thats potentially part of the same tipset as our best block + if base.IsChildOf(known) { + // common case: receiving blocks that are building on top of our best tipset return blockSet, nil } - if types.CidArrsEqual(base.Parents().Cids(), known.Cids()) { - // common case: receiving blocks that are building on top of our best tipset + knownParent, err := syncer.store.LoadTipSet(known.Parents()) + if err != nil { + return nil, xerrors.Errorf("failed to load next local tipset: %w", err) + } + if base.IsChildOf(knownParent) { + // common case: receiving a block thats potentially part of the same tipset as our best block return blockSet, nil }