diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index d81cf1c72..ba3dcd1d8 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -2,6 +2,7 @@ package stmgr import ( "context" + "errors" "fmt" "sync" @@ -507,16 +508,7 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T return nil, fmt.Errorf("failed to load message: %w", err) } - r, _, err := sm.tipsetExecutedMessage(ts, msg, m.VMMessage()) - if err != nil { - return nil, err - } - - if r != nil { - return r, nil - } - - _, r, _, err = sm.searchBackForMsg(ctx, ts, m) + _, r, _, err := sm.searchBackForMsg(ctx, ts, m) if err != nil { return nil, fmt.Errorf("failed to look back through chain for message: %w", err) } @@ -674,6 +666,18 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*ty func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { cur := from + curActor, err := sm.LoadActor(ctx, m.VMMessage().From, cur) + if err != nil { + return nil, nil, cid.Undef, xerrors.Errorf("failed to load initital tipset") + } + + mFromId, err := sm.LookupID(ctx, m.VMMessage().From, from) + if err != nil { + return nil, nil, cid.Undef, xerrors.Errorf("looking up From id address: %w", err) + } + + mNonce := m.VMMessage().Nonce + for { if cur.Height() == 0 { // it ain't here! @@ -686,32 +690,37 @@ func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet default: } - act, err := sm.LoadActor(ctx, m.VMMessage().From, cur) - if err != nil { - return nil, nil, cid.Cid{}, err - } - // we either have no messages from the sender, or the latest message we found has a lower nonce than the one being searched for, // either way, no reason to lookback, it ain't there - if act.Nonce == 0 || act.Nonce < m.VMMessage().Nonce { + if curActor == nil || curActor.Nonce == 0 || curActor.Nonce < mNonce { return nil, nil, cid.Undef, nil } - ts, err := sm.cs.LoadTipSet(cur.Parents()) + pts, err := sm.cs.LoadTipSet(cur.Parents()) if err != nil { - return nil, nil, cid.Undef, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err) + return nil, nil, cid.Undef, xerrors.Errorf("failed to load tipset during msg wait searchback: %w", err) } - r, foundMsg, err := sm.tipsetExecutedMessage(ts, m.Cid(), m.VMMessage()) - if err != nil { - return nil, nil, cid.Undef, fmt.Errorf("checking for message execution during lookback: %w", err) + act, err := sm.LoadActor(ctx, mFromId, pts) + actorNoExist := errors.Is(err, types.ErrActorNotFound) + if err != nil && !actorNoExist { + return nil, nil, cid.Cid{}, xerrors.Errorf("failed to load the actor: %w", err) } - if r != nil { - return ts, r, foundMsg, nil + // check that between cur and parent tipset the nonce fell into range of our message + if actorNoExist || (curActor.Nonce > mNonce && act.Nonce <= mNonce) { + r, foundMsg, err := sm.tipsetExecutedMessage(cur, m.Cid(), m.VMMessage()) + if err != nil { + return nil, nil, cid.Undef, xerrors.Errorf("checking for message execution during lookback: %w", err) + } + + if r != nil { + return pts, r, foundMsg, nil + } } - cur = ts + cur = pts + curActor = act } } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 07b3343d2..d51c481d1 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -172,25 +172,24 @@ func fetchCids( cids []cid.Cid, cb func(int, blocks.Block) error, ) error { - fetchedBlocks := bserv.GetBlocks(ctx, cids) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() cidIndex := make(map[cid.Cid]int) for i, c := range cids { cidIndex[c] = i } + if len(cids) != len(cidIndex) { + return fmt.Errorf("duplicate CIDs in fetchCids input") + } + + fetchedBlocks := bserv.GetBlocks(ctx, cids) for i := 0; i < len(cids); i++ { select { case block, ok := <-fetchedBlocks: if !ok { - // Closed channel, no more blocks fetched, check if we have all - // of the CIDs requested. - // FIXME: Review this check. We don't call the callback on the - // last index? - if i == len(cids)-1 { - break - } - return fmt.Errorf("failed to fetch all messages") } diff --git a/chain/sub/incoming_test.go b/chain/sub/incoming_test.go new file mode 100644 index 000000000..215439209 --- /dev/null +++ b/chain/sub/incoming_test.go @@ -0,0 +1,63 @@ +package sub + +import ( + "context" + "testing" + + address "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/types" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" +) + +type getter struct { + msgs []*types.Message +} + +func (g *getter) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { panic("NYI") } + +func (g *getter) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { + ch := make(chan blocks.Block, len(g.msgs)) + for _, m := range g.msgs { + by, err := m.Serialize() + if err != nil { + panic(err) + } + b, err := blocks.NewBlockWithCid(by, m.Cid()) + if err != nil { + panic(err) + } + ch <- b + } + close(ch) + return ch +} + +func TestFetchCidsWithDedup(t *testing.T) { + msgs := []*types.Message{} + for i := 0; i < 10; i++ { + msgs = append(msgs, &types.Message{ + From: address.TestAddress, + To: address.TestAddress, + + Nonce: uint64(i), + }) + } + cids := []cid.Cid{} + for _, m := range msgs { + cids = append(cids, m.Cid()) + } + g := &getter{msgs} + + // the cids have a duplicate + res, err := FetchMessagesByCids(context.TODO(), g, append(cids, cids[0])) + + t.Logf("err: %+v", err) + t.Logf("res: %+v", res) + if err == nil { + t.Errorf("there should be an error") + } + if err == nil && (res[0] == nil || res[len(res)-1] == nil) { + t.Fatalf("there is a nil message: first %p, last %p", res[0], res[len(res)-1]) + } +} diff --git a/chain/sync.go b/chain/sync.go index 240d1edef..c280e3a40 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -217,6 +217,12 @@ func (syncer *Syncer) Stop() { // This should be called when connecting to new peers, and additionally // when receiving new blocks from the network func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool { + defer func() { + if err := recover(); err != nil { + log.Errorf("panic in InformNewHead: ", err) + } + }() + ctx := context.Background() if fts == nil { log.Errorf("got nil tipset in InformNewHead") @@ -1281,9 +1287,11 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet blockSet := []*types.TipSet{incoming} + // Parent of the new (possibly better) tipset that we need to fetch next. at := incoming.Parents() - // we want to sync all the blocks until the height above the block we have + // we want to sync all the blocks until the height above our + // best tipset so far untilHeight := known.Height() + 1 ss.SetHeight(blockSet[len(blockSet)-1].Height()) @@ -1377,13 +1385,17 @@ loop: } base := blockSet[len(blockSet)-1] - if base.Parents() == known.Parents() { - // common case: receiving a block thats potentially part of the same tipset as our best block + if base.IsChildOf(known) { + // common case: receiving blocks that are building on top of our best tipset return blockSet, nil } - if types.CidArrsEqual(base.Parents().Cids(), known.Cids()) { - // common case: receiving blocks that are building on top of our best tipset + knownParent, err := syncer.store.LoadTipSet(known.Parents()) + if err != nil { + return nil, xerrors.Errorf("failed to load next local tipset: %w", err) + } + if base.IsChildOf(knownParent) { + // common case: receiving a block thats potentially part of the same tipset as our best block return blockSet, nil }