diff --git a/chain/blocksync.go b/chain/blocksync.go index ab447fd4b..b38c2a8a0 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -296,7 +296,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun req := &BlockSyncRequest{ Start: h.Cids(), RequestLength: count, - Options: BSOptMessages, + Options: BSOptMessages | BSOptBlocks, } var err error diff --git a/chain/store/store.go b/chain/store/store.go index 89f02e8c3..a6d724765 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -577,3 +577,24 @@ func (cs *ChainStore) blockContainsMsg(blk *types.BlockHeader, msg cid.Cid) (*ty func (cs *ChainStore) Blockstore() blockstore.Blockstore { return cs.bs } + +func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) { + var out []*types.FullBlock + + for _, b := range ts.Blocks() { + msgs, err := cs.MessagesForBlock(b) + if err != nil { + // TODO: check for 'not found' errors, and only return nil if this + // is actually a 'not found' error + return nil, nil + } + + fb := &types.FullBlock{ + Header: b, + Messages: msgs, + } + + out = append(out, fb) + } + return NewFullTipSet(out), nil +} diff --git a/chain/sync.go b/chain/sync.go index 3b764f7e0..9102a52a0 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -410,64 +410,73 @@ func (syncer *Syncer) collectHeaders(from *types.TipSet, toHeight uint64) ([]*ty } func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error { - // Fetch all the messages for all the blocks in this chain - cur := headers[len(headers)-1] + return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error { + if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { + log.Errorf("failed to validate tipset: %s", err) + return xerrors.Errorf("message processing failed: %w", err) + } + return nil + }) +} - windowSize := uint64(10) - for i := uint64(0); i <= cur.Height(); i += windowSize { +// fills out each of the given tipsets with messages and calls the callback with it +func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.FullTipSet) error) error { + var beg int + // handle case where we have a prefix of these locally + for ; beg < len(headers); beg++ { + fts, err := syncer.store.TryFillTipSet(headers[beg]) + if err != nil { + return err + } + if fts == nil { + break + } + if err := cb(fts); err != nil { + return err + } + } + headers = headers[beg:] + + windowSize := 10 + + for i := 0; i < len(headers); i += windowSize { + // temp storage so we don't persist data we dont want to ds := dstore.NewMapDatastore() bs := bstore.NewBlockstore(ds) cst := hamt.CSTFromBstore(bs) - nextHeight := i + windowSize - 1 - if nextHeight > cur.Height() { - nextHeight = cur.Height() + batchSize := windowSize + if i+batchSize >= len(headers) { + batchSize = (len(headers) - i) - 1 } - next := headers[nextHeight] - bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i) + next := headers[i+batchSize] + bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, uint64(batchSize+1)) if err != nil { log.Errorf("failed to fetch messages: %s", err) return xerrors.Errorf("message processing failed: %w", err) } for bsi := 0; bsi < len(bstips); bsi++ { - this := headers[i+uint64(bsi)] + this := headers[i+bsi] bstip := bstips[len(bstips)-(bsi+1)] fts, err := zipTipSetAndMessages(cst, this, bstip.Messages, bstip.MsgIncludes) if err != nil { log.Error("zipping failed: ", err, bsi, i) log.Error("height: ", this.Height()) + log.Error("bstip height: ", bstip.Blocks[0].Height) log.Error("bstips: ", bstips) - log.Error("next height: ", nextHeight) + log.Error("next height: ", i+batchSize) return xerrors.Errorf("message processing failed: %w", err) } - if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { - log.Errorf("failed to validate tipset: %s", err) - return xerrors.Errorf("message processing failed: %w", err) + if err := cb(fts); err != nil { + return err } } - for _, bst := range bstips { - for _, m := range bst.Messages { - switch m.Signature.Type { - case types.KTBLS: - //log.Infof("putting BLS message: %s", m.Cid()) - if _, err := store.PutMessage(bs, &m.Message); err != nil { - log.Error("failed to persist messages: ", err) - return xerrors.Errorf("BLS message processing failed: %w", err) - } - case types.KTSecp256k1: - //log.Infof("putting secp256k1 message: %s", m.Cid()) - if _, err := store.PutMessage(bs, m); err != nil { - log.Error("failed to persist messages: ", err) - return xerrors.Errorf("secp256k1 message processing failed: %w", err) - } - default: - return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode) - } - } + if err := persistMessages(bs, bstips); err != nil { + return err } if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil { @@ -478,6 +487,31 @@ func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error { return nil } +func persistMessages(bs bstore.Blockstore, bstips []*BSTipSet) error { + for _, bst := range bstips { + for _, m := range bst.Messages { + switch m.Signature.Type { + case types.KTBLS: + //log.Infof("putting BLS message: %s", m.Cid()) + if _, err := store.PutMessage(bs, &m.Message); err != nil { + log.Error("failed to persist messages: ", err) + return xerrors.Errorf("BLS message processing failed: %w", err) + } + case types.KTSecp256k1: + //log.Infof("putting secp256k1 message: %s", m.Cid()) + if _, err := store.PutMessage(bs, m); err != nil { + log.Error("failed to persist messages: ", err) + return xerrors.Errorf("secp256k1 message processing failed: %w", err) + } + default: + return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode) + } + } + } + + return nil +} + func (syncer *Syncer) collectChain(fts *store.FullTipSet) error { curHeight := syncer.head.Height()