properly handle partial sync responses

This commit is contained in:
Jeromy 2020-04-01 11:35:09 -07:00
parent f6c260c49f
commit 27d57264d9
3 changed files with 26 additions and 15 deletions

View File

@ -105,7 +105,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i
continue continue
} }
if res.Status == 0 { if res.Status == StatusOK || res.Status == StatusPartial {
resp, err := bs.processBlocksResponse(req, res) resp, err := bs.processBlocksResponse(req, res)
if err != nil { if err != nil {
return nil, xerrors.Errorf("success response from peer failed to process: %w", err) return nil, xerrors.Errorf("success response from peer failed to process: %w", err)
@ -114,6 +114,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i
bs.host.ConnManager().TagPeer(p, "bsync", 25) bs.host.ConnManager().TagPeer(p, "bsync", 25)
return resp, nil return resp, nil
} }
oerr = bs.processStatus(req, res) oerr = bs.processStatus(req, res)
if oerr != nil { if oerr != nil {
log.Warnf("BlockSync peer %s response was an error: %s", p.String(), oerr) log.Warnf("BlockSync peer %s response was an error: %s", p.String(), oerr)
@ -145,7 +146,7 @@ func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, tsk types.Tip
return bstsToFullTipSet(bts) return bstsToFullTipSet(bts)
case 101: // Partial Response case 101: // Partial Response
return nil, xerrors.Errorf("partial responses are not handled") return nil, xerrors.Errorf("partial responses are not handled for single tipset fetching")
case 201: // req.Start not found case 201: // req.Start not found
return nil, fmt.Errorf("not found") return nil, fmt.Errorf("not found")
case 202: // Go Away case 202: // Go Away
@ -185,7 +186,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
req := &BlockSyncRequest{ req := &BlockSyncRequest{
Start: h.Cids(), Start: h.Cids(),
RequestLength: count, RequestLength: count,
Options: BSOptMessages | BSOptBlocks, Options: BSOptMessages,
} }
var err error var err error
@ -205,8 +206,8 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
} }
if res.Status == StatusPartial { if res.Status == StatusPartial {
log.Warn("dont yet handle partial responses") // TODO: track partial response sizes to ensure we don't overrequest too often
continue return res.Chain, nil
} }
err = bs.processStatus(req, res) err = bs.processStatus(req, res)

View File

@ -321,7 +321,7 @@ func zipTipSetAndMessages(bs cbor.IpldStore, ts *types.TipSet, allbmsgs []*types
} }
if b.Messages != mrcid { if b.Messages != mrcid {
return nil, fmt.Errorf("messages didnt match message root in header") return nil, fmt.Errorf("messages didnt match message root in header for ts %s", ts.Key())
} }
fb := &types.FullBlock{ fb := &types.FullBlock{
@ -1133,25 +1133,35 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
batchSize = i batchSize = i
} }
next := headers[i-batchSize] nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index
bstips, err := syncer.Bsync.GetChainMessages(ctx, next, uint64(batchSize+1))
var bstout []*blocksync.BSTipSet
for len(bstout) < batchSize {
next := headers[nextI]
nreq := batchSize - len(bstout)
bstips, err := syncer.Bsync.GetChainMessages(ctx, next, uint64(nreq))
if err != nil { if err != nil {
return xerrors.Errorf("message processing failed: %w", err) return xerrors.Errorf("message processing failed: %w", err)
} }
for bsi := 0; bsi < len(bstips); bsi++ { bstout = append(bstout, bstips...)
nextI += len(bstips)
}
for bsi := 0; bsi < len(bstout); bsi++ {
// temp storage so we don't persist data we dont want to // temp storage so we don't persist data we dont want to
ds := dstore.NewMapDatastore() ds := dstore.NewMapDatastore()
bs := bstore.NewBlockstore(ds) bs := bstore.NewBlockstore(ds)
blks := cbor.NewCborStore(bs) blks := cbor.NewCborStore(bs)
this := headers[i-bsi] this := headers[i-bsi]
bstip := bstips[len(bstips)-(bsi+1)] bstip := bstout[len(bstout)-(bsi+1)]
fts, err := zipTipSetAndMessages(blks, this, bstip.BlsMessages, bstip.SecpkMessages, bstip.BlsMsgIncludes, bstip.SecpkMsgIncludes) fts, err := zipTipSetAndMessages(blks, this, bstip.BlsMessages, bstip.SecpkMessages, bstip.BlsMsgIncludes, bstip.SecpkMsgIncludes)
if err != nil { if err != nil {
log.Warnw("zipping failed", "error", err, "bsi", bsi, "i", i, log.Warnw("zipping failed", "error", err, "bsi", bsi, "i", i,
"height", this.Height(), "bstip-height", bstip.Blocks[0].Height, "height", this.Height(), "bstip-height", bstip.Blocks[0].Height,
"bstips", bstips, "next-height", i+batchSize) "next-height", i+batchSize)
return xerrors.Errorf("message processing failed: %w", err) return xerrors.Errorf("message processing failed: %w", err)
} }
@ -1167,7 +1177,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
return xerrors.Errorf("message processing failed: %w", err) return xerrors.Errorf("message processing failed: %w", err)
} }
} }
i -= windowSize i -= batchSize
} }
return nil return nil

View File

@ -52,7 +52,7 @@ func (crt *incrt) Read(buf []byte) (int, error) {
err := crt.rd.SetReadDeadline(start.Add(crt.wait)) err := crt.rd.SetReadDeadline(start.Add(crt.wait))
if err != nil { if err != nil {
log.Warnf("unable to set daedline: %+v", err) log.Warnf("unable to set deadline: %+v", err)
} }
n, err := crt.rd.Read(buf) n, err := crt.rd.Read(buf)