some cleanup
This commit is contained in:
parent
2e9a052f62
commit
9d7f19b950
@ -19,8 +19,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// could be anything? <100 though I think is the base limit
|
|
||||||
recursionDepth = 50
|
|
||||||
|
|
||||||
// AMT selector recursion. An AMT has arity of 8 so this gives allows
|
// AMT selector recursion. An AMT has arity of 8 so this gives allows
|
||||||
// us to retrieve trees with 8^10 (1,073,741,824) elements.
|
// us to retrieve trees with 8^10 (1,073,741,824) elements.
|
||||||
@ -46,21 +44,11 @@ const (
|
|||||||
maxRequestLength = 50
|
maxRequestLength = 50
|
||||||
)
|
)
|
||||||
|
|
||||||
var blockHeadersSelector, fullSelector ipld.Node
|
|
||||||
|
|
||||||
var amtSelector selectorbuilder.SelectorSpec
|
var amtSelector selectorbuilder.SelectorSpec
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
// builer for selectors
|
// builer for selectors
|
||||||
ssb := selectorbuilder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
|
ssb := selectorbuilder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
|
||||||
// blockHeaders only selector
|
|
||||||
blockHeadersSelector = ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(recursionDepth), ssb.ExploreIndex(blockIndexParentsField,
|
|
||||||
ssb.ExploreUnion(
|
|
||||||
ssb.ExploreAll(
|
|
||||||
ssb.Matcher(),
|
|
||||||
),
|
|
||||||
ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()),
|
|
||||||
))).Node()
|
|
||||||
// amt selector -- needed to selector through a messages AMT
|
// amt selector -- needed to selector through a messages AMT
|
||||||
amtSelector = ssb.ExploreIndex(amtHeadNodeFieldIndex,
|
amtSelector = ssb.ExploreIndex(amtHeadNodeFieldIndex,
|
||||||
ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(int(amtRecursionDepth)),
|
ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(int(amtRecursionDepth)),
|
||||||
@ -69,16 +57,6 @@ func init() {
|
|||||||
ssb.ExploreAll(ssb.ExploreRecursiveEdge())),
|
ssb.ExploreAll(ssb.ExploreRecursiveEdge())),
|
||||||
ssb.ExploreIndex(amtNodeValuesFieldIndex,
|
ssb.ExploreIndex(amtNodeValuesFieldIndex,
|
||||||
ssb.ExploreAll(ssb.Matcher())))))
|
ssb.ExploreAll(ssb.Matcher())))))
|
||||||
// messages too selector
|
|
||||||
fullSelector = ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(recursionDepth),
|
|
||||||
ssb.ExploreIndex(blockIndexParentsField,
|
|
||||||
ssb.ExploreUnion(
|
|
||||||
ssb.ExploreAll(
|
|
||||||
ssb.ExploreIndex(blockIndexMessagesField,
|
|
||||||
ssb.ExploreRange(0, 2, amtSelector),
|
|
||||||
)),
|
|
||||||
ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()),
|
|
||||||
))).Node()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func selectorForRequest(req *BlockSyncRequest) ipld.Node {
|
func selectorForRequest(req *BlockSyncRequest) ipld.Node {
|
||||||
@ -104,7 +82,6 @@ func selectorForRequest(req *BlockSyncRequest) ipld.Node {
|
|||||||
),
|
),
|
||||||
ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()),
|
ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()),
|
||||||
))).Node()
|
))).Node()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func firstTipsetSelector(req *BlockSyncRequest) ipld.Node {
|
func firstTipsetSelector(req *BlockSyncRequest) ipld.Node {
|
||||||
@ -144,43 +121,26 @@ func (bs *BlockSync) fetchBlocksGraphSync(ctx context.Context, p peer.ID, req *B
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var totalDepth uint64 = 0
|
if req.RequestLength > maxRequestLength {
|
||||||
var nextReq BlockSyncRequest = *req
|
req.RequestLength = maxRequestLength
|
||||||
var wholeChain []*BSTipSet
|
|
||||||
var reachedGenesis bool
|
|
||||||
for totalDepth < req.RequestLength && !reachedGenesis {
|
|
||||||
if nextReq.RequestLength > maxRequestLength {
|
|
||||||
nextReq.RequestLength = maxRequestLength
|
|
||||||
}
|
|
||||||
|
|
||||||
sel := selectorForRequest(&nextReq)
|
|
||||||
|
|
||||||
// execute the selector forreal
|
|
||||||
if err := bs.executeGsyncSelector(ctx, p, req.Start[0], sel); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now pull the data we fetched out of the chainstore (where it should now be persisted)
|
|
||||||
tempcs := store.NewChainStore(bs.bserv.Blockstore(), datastore.NewMapDatastore(), nil)
|
|
||||||
|
|
||||||
opts := ParseBSOptions(req.Options)
|
|
||||||
tsk := types.NewTipSetKey(req.Start...)
|
|
||||||
chain, err := collectChainSegment(tempcs, tsk, req.RequestLength, opts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("failed to load chain data from chainstore after successful graphsync response (start = %v): %w", req.Start, err)
|
|
||||||
}
|
|
||||||
wholeChain = append(wholeChain, chain...)
|
|
||||||
totalDepth += nextReq.RequestLength
|
|
||||||
nextCids := make([]cid.Cid, 0, len(chain[len(chain)-1].Blocks))
|
|
||||||
for _, blk := range chain[len(chain)-1].Blocks {
|
|
||||||
if blk.Height == 0 || blk.Parents == nil {
|
|
||||||
reachedGenesis = true
|
|
||||||
}
|
|
||||||
nextCids = append(nextCids, blk.Cid())
|
|
||||||
}
|
|
||||||
nextReq.Start = nextCids
|
|
||||||
nextReq.RequestLength = req.RequestLength - totalDepth
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &BlockSyncResponse{Chain: wholeChain}, nil
|
sel := selectorForRequest(req)
|
||||||
|
|
||||||
|
// execute the selector forreal
|
||||||
|
if err := bs.executeGsyncSelector(ctx, p, req.Start[0], sel); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now pull the data we fetched out of the chainstore (where it should now be persisted)
|
||||||
|
tempcs := store.NewChainStore(bs.bserv.Blockstore(), datastore.NewMapDatastore(), nil)
|
||||||
|
|
||||||
|
opts := ParseBSOptions(req.Options)
|
||||||
|
tsk := types.NewTipSetKey(req.Start...)
|
||||||
|
chain, err := collectChainSegment(tempcs, tsk, req.RequestLength, opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to load chain data from chainstore after successful graphsync response (start = %v): %w", req.Start, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &BlockSyncResponse{Chain: chain}, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user