feat(chainsync): fixes to make graphsync work for chain fetching

- store to chain blockstore (ok for now, since storage provider is a seperate process)
- simplify request fetching and processing
This commit is contained in:
hannahhoward 2020-03-19 14:28:59 -07:00 committed by whyrusleeping
parent 1076a1a89d
commit 34f755b2b9
2 changed files with 28 additions and 27 deletions

View File

@ -93,35 +93,38 @@ func selectorForRequest(req *BlockSyncRequest) ipld.Node {
)),
ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()),
))).Node()
} else {
return ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(int(req.RequestLength)), ssb.ExploreIndex(blockIndexParentsField,
ssb.ExploreUnion(
ssb.ExploreAll(
ssb.Matcher(),
),
ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()),
))).Node()
}
return ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(int(req.RequestLength)), ssb.ExploreIndex(blockIndexParentsField,
ssb.ExploreUnion(
ssb.ExploreAll(
ssb.Matcher(),
),
ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()),
))).Node()
}
func firstTipsetSelector(req *BlockSyncRequest) ipld.Node {
// builer for selectors
ssb := selectorbuilder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
bso := ParseBSOptions(req.Options)
if bso.IncludeMessages {
return ssb.ExploreIndex(blockIndexMessagesField,
ssb.ExploreRange(0, 2, amtSelector),
).Node()
}
return ssb.Matcher().Node()
}
func (bs *BlockSync) executeGsyncSelector(ctx context.Context, p peer.ID, root cid.Cid, sel ipld.Node) error {
resp, errs := bs.gsync.Request(ctx, p, cidlink.Link{Cid: root}, sel)
_, errs := bs.gsync.Request(ctx, p, cidlink.Link{Cid: root}, sel)
for {
select {
case _, ok := <-resp:
if !ok {
return nil
}
case err, ok := <-errs:
if !ok {
return nil
}
return xerrors.Errorf("failed to complete graphsync request: %w", err)
}
for err := range errs {
return xerrors.Errorf("failed to complete graphsync request: %w", err)
}
return nil
}
// Fallback for interacting with other non-lotus nodes
@ -138,9 +141,7 @@ func (bs *BlockSync) fetchBlocksGraphSync(ctx context.Context, p peer.ID, req *B
}
}
req2 := *req
req2.RequestLength = 1
immediateTsSelector := selectorForRequest(&req2)
immediateTsSelector := firstTipsetSelector(req)
// Do this because we can only request one root at a time
for _, r := range req.Start {

View File

@ -15,8 +15,8 @@ import (
)
// GraphsyncStorer creates a storer that stores data in the client blockstore
func GraphsyncStorer(clientBs dtypes.ClientBlockstore) dtypes.GraphsyncStorer {
return dtypes.GraphsyncStorer(storeutil.StorerForBlockstore(clientBs))
func GraphsyncStorer(chainBs dtypes.ChainBlockstore) dtypes.GraphsyncStorer {
return dtypes.GraphsyncStorer(storeutil.StorerForBlockstore(chainBs))
}
// GraphsyncLoader creates a loader that reads from both the chain blockstore and the client blockstore