fix(chainsync): support longer graphsync fetches with loop
This commit is contained in:
parent
34f755b2b9
commit
2e9a052f62
@ -41,6 +41,9 @@ const (
|
|||||||
|
|
||||||
// field index of values array AMT node
|
// field index of values array AMT node
|
||||||
amtNodeValuesFieldIndex = 2
|
amtNodeValuesFieldIndex = 2
|
||||||
|
|
||||||
|
// maximum depth per traversal
|
||||||
|
maxRequestLength = 50
|
||||||
)
|
)
|
||||||
|
|
||||||
var blockHeadersSelector, fullSelector ipld.Node
|
var blockHeadersSelector, fullSelector ipld.Node
|
||||||
@ -132,15 +135,6 @@ func (bs *BlockSync) fetchBlocksGraphSync(ctx context.Context, p peer.ID, req *B
|
|||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
sel := selectorForRequest(req)
|
|
||||||
|
|
||||||
// TODO:
|
|
||||||
for _, c := range req.Start {
|
|
||||||
if _, err := bs.bserv.GetBlock(ctx, c); err != nil {
|
|
||||||
return nil, xerrors.Errorf("failed to fetch blocks: %w")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
immediateTsSelector := firstTipsetSelector(req)
|
immediateTsSelector := firstTipsetSelector(req)
|
||||||
|
|
||||||
// Do this because we can only request one root at a time
|
// Do this because we can only request one root at a time
|
||||||
@ -150,20 +144,43 @@ func (bs *BlockSync) fetchBlocksGraphSync(ctx context.Context, p peer.ID, req *B
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// execute the selector forreal
|
var totalDepth uint64 = 0
|
||||||
if err := bs.executeGsyncSelector(ctx, p, req.Start[0], sel); err != nil {
|
var nextReq BlockSyncRequest = *req
|
||||||
return nil, err
|
var wholeChain []*BSTipSet
|
||||||
|
var reachedGenesis bool
|
||||||
|
for totalDepth < req.RequestLength && !reachedGenesis {
|
||||||
|
if nextReq.RequestLength > maxRequestLength {
|
||||||
|
nextReq.RequestLength = maxRequestLength
|
||||||
|
}
|
||||||
|
|
||||||
|
sel := selectorForRequest(&nextReq)
|
||||||
|
|
||||||
|
// execute the selector forreal
|
||||||
|
if err := bs.executeGsyncSelector(ctx, p, req.Start[0], sel); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now pull the data we fetched out of the chainstore (where it should now be persisted)
|
||||||
|
tempcs := store.NewChainStore(bs.bserv.Blockstore(), datastore.NewMapDatastore(), nil)
|
||||||
|
|
||||||
|
opts := ParseBSOptions(req.Options)
|
||||||
|
tsk := types.NewTipSetKey(req.Start...)
|
||||||
|
chain, err := collectChainSegment(tempcs, tsk, req.RequestLength, opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to load chain data from chainstore after successful graphsync response (start = %v): %w", req.Start, err)
|
||||||
|
}
|
||||||
|
wholeChain = append(wholeChain, chain...)
|
||||||
|
totalDepth += nextReq.RequestLength
|
||||||
|
nextCids := make([]cid.Cid, 0, len(chain[len(chain)-1].Blocks))
|
||||||
|
for _, blk := range chain[len(chain)-1].Blocks {
|
||||||
|
if blk.Height == 0 || blk.Parents == nil {
|
||||||
|
reachedGenesis = true
|
||||||
|
}
|
||||||
|
nextCids = append(nextCids, blk.Cid())
|
||||||
|
}
|
||||||
|
nextReq.Start = nextCids
|
||||||
|
nextReq.RequestLength = req.RequestLength - totalDepth
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now pull the data we fetched out of the chainstore (where it should now be persisted)
|
return &BlockSyncResponse{Chain: wholeChain}, nil
|
||||||
tempcs := store.NewChainStore(bs.bserv.Blockstore(), datastore.NewMapDatastore(), nil)
|
|
||||||
|
|
||||||
opts := ParseBSOptions(req.Options)
|
|
||||||
tsk := types.NewTipSetKey(req.Start...)
|
|
||||||
chain, err := collectChainSegment(tempcs, tsk, req.RequestLength, opts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("failed to load chain data from chainstore after successful graphsync response (start = %v): %w", req.Start, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &BlockSyncResponse{Chain: chain}, nil
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user