diff --git a/chain/blocksync/graphsync_client.go b/chain/blocksync/graphsync_client.go index 33f91fdd7..5d4af01cb 100644 --- a/chain/blocksync/graphsync_client.go +++ b/chain/blocksync/graphsync_client.go @@ -93,35 +93,38 @@ func selectorForRequest(req *BlockSyncRequest) ipld.Node { )), ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()), ))).Node() - } else { - return ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(int(req.RequestLength)), ssb.ExploreIndex(blockIndexParentsField, - ssb.ExploreUnion( - ssb.ExploreAll( - ssb.Matcher(), - ), - ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()), - ))).Node() } + return ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(int(req.RequestLength)), ssb.ExploreIndex(blockIndexParentsField, + ssb.ExploreUnion( + ssb.ExploreAll( + ssb.Matcher(), + ), + ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()), + ))).Node() + +} + +func firstTipsetSelector(req *BlockSyncRequest) ipld.Node { + // builer for selectors + ssb := selectorbuilder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) + + bso := ParseBSOptions(req.Options) + if bso.IncludeMessages { + return ssb.ExploreIndex(blockIndexMessagesField, + ssb.ExploreRange(0, 2, amtSelector), + ).Node() + } + return ssb.Matcher().Node() } func (bs *BlockSync) executeGsyncSelector(ctx context.Context, p peer.ID, root cid.Cid, sel ipld.Node) error { - resp, errs := bs.gsync.Request(ctx, p, cidlink.Link{Cid: root}, sel) + _, errs := bs.gsync.Request(ctx, p, cidlink.Link{Cid: root}, sel) - for { - select { - case _, ok := <-resp: - if !ok { - return nil - } - case err, ok := <-errs: - if !ok { - return nil - } - return xerrors.Errorf("failed to complete graphsync request: %w", err) - } + for err := range errs { + return xerrors.Errorf("failed to complete graphsync request: %w", err) } - + return nil } // Fallback for interacting with other non-lotus nodes @@ -138,9 +141,7 @@ func (bs *BlockSync) fetchBlocksGraphSync(ctx context.Context, p peer.ID, req *B } } - req2 := *req - req2.RequestLength = 1 - immediateTsSelector := selectorForRequest(&req2) + immediateTsSelector := firstTipsetSelector(req) // Do this because we can only request one root at a time for _, r := range req.Start { diff --git a/node/modules/graphsync.go b/node/modules/graphsync.go index 13c1dde10..ec1d7e903 100644 --- a/node/modules/graphsync.go +++ b/node/modules/graphsync.go @@ -15,8 +15,8 @@ import ( ) // GraphsyncStorer creates a storer that stores data in the client blockstore -func GraphsyncStorer(clientBs dtypes.ClientBlockstore) dtypes.GraphsyncStorer { - return dtypes.GraphsyncStorer(storeutil.StorerForBlockstore(clientBs)) +func GraphsyncStorer(chainBs dtypes.ChainBlockstore) dtypes.GraphsyncStorer { + return dtypes.GraphsyncStorer(storeutil.StorerForBlockstore(chainBs)) } // GraphsyncLoader creates a loader that reads from both the chain blockstore and the client blockstore