From 27d57264d9891d868b7dd3153818625486e0f509 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 1 Apr 2020 11:35:09 -0700 Subject: [PATCH] properly handle partial sync responses --- chain/blocksync/blocksync_client.go | 11 ++++++----- chain/sync.go | 28 +++++++++++++++++++--------- lib/increadtimeout/incrt.go | 2 +- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/chain/blocksync/blocksync_client.go b/chain/blocksync/blocksync_client.go index c1048934f..0790cb128 100644 --- a/chain/blocksync/blocksync_client.go +++ b/chain/blocksync/blocksync_client.go @@ -105,7 +105,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i continue } - if res.Status == 0 { + if res.Status == StatusOK || res.Status == StatusPartial { resp, err := bs.processBlocksResponse(req, res) if err != nil { return nil, xerrors.Errorf("success response from peer failed to process: %w", err) @@ -114,6 +114,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i bs.host.ConnManager().TagPeer(p, "bsync", 25) return resp, nil } + oerr = bs.processStatus(req, res) if oerr != nil { log.Warnf("BlockSync peer %s response was an error: %s", p.String(), oerr) @@ -145,7 +146,7 @@ func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, tsk types.Tip return bstsToFullTipSet(bts) case 101: // Partial Response - return nil, xerrors.Errorf("partial responses are not handled") + return nil, xerrors.Errorf("partial responses are not handled for single tipset fetching") case 201: // req.Start not found return nil, fmt.Errorf("not found") case 202: // Go Away @@ -185,7 +186,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun req := &BlockSyncRequest{ Start: h.Cids(), RequestLength: count, - Options: BSOptMessages | BSOptBlocks, + Options: BSOptMessages, } var err error @@ -205,8 +206,8 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun } if res.Status == StatusPartial { - log.Warn("dont yet handle partial responses") - continue + // TODO: track partial response sizes to ensure we don't overrequest too often + return res.Chain, nil } err = bs.processStatus(req, res) diff --git a/chain/sync.go b/chain/sync.go index a29d09594..bbbbfefc7 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -321,7 +321,7 @@ func zipTipSetAndMessages(bs cbor.IpldStore, ts *types.TipSet, allbmsgs []*types } if b.Messages != mrcid { - return nil, fmt.Errorf("messages didnt match message root in header") + return nil, fmt.Errorf("messages didnt match message root in header for ts %s", ts.Key()) } fb := &types.FullBlock{ @@ -1133,25 +1133,35 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS batchSize = i } - next := headers[i-batchSize] - bstips, err := syncer.Bsync.GetChainMessages(ctx, next, uint64(batchSize+1)) - if err != nil { - return xerrors.Errorf("message processing failed: %w", err) + nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index + + var bstout []*blocksync.BSTipSet + for len(bstout) < batchSize { + next := headers[nextI] + + nreq := batchSize - len(bstout) + bstips, err := syncer.Bsync.GetChainMessages(ctx, next, uint64(nreq)) + if err != nil { + return xerrors.Errorf("message processing failed: %w", err) + } + + bstout = append(bstout, bstips...) + nextI += len(bstips) } - for bsi := 0; bsi < len(bstips); bsi++ { + for bsi := 0; bsi < len(bstout); bsi++ { // temp storage so we don't persist data we dont want to ds := dstore.NewMapDatastore() bs := bstore.NewBlockstore(ds) blks := cbor.NewCborStore(bs) this := headers[i-bsi] - bstip := bstips[len(bstips)-(bsi+1)] + bstip := bstout[len(bstout)-(bsi+1)] fts, err := zipTipSetAndMessages(blks, this, bstip.BlsMessages, bstip.SecpkMessages, bstip.BlsMsgIncludes, bstip.SecpkMsgIncludes) if err != nil { log.Warnw("zipping failed", "error", err, "bsi", bsi, "i", i, "height", this.Height(), "bstip-height", bstip.Blocks[0].Height, - "bstips", bstips, "next-height", i+batchSize) + "next-height", i+batchSize) return xerrors.Errorf("message processing failed: %w", err) } @@ -1167,7 +1177,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS return xerrors.Errorf("message processing failed: %w", err) } } - i -= windowSize + i -= batchSize } return nil diff --git a/lib/increadtimeout/incrt.go b/lib/increadtimeout/incrt.go index e273b03ab..ca44e0a6d 100644 --- a/lib/increadtimeout/incrt.go +++ b/lib/increadtimeout/incrt.go @@ -52,7 +52,7 @@ func (crt *incrt) Read(buf []byte) (int, error) { err := crt.rd.SetReadDeadline(start.Add(crt.wait)) if err != nil { - log.Warnf("unable to set daedline: %+v", err) + log.Warnf("unable to set deadline: %+v", err) } n, err := crt.rd.Read(buf)