152 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			152 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package blocksync
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
 | 
						|
	"github.com/ipfs/go-cid"
 | 
						|
	"github.com/ipfs/go-datastore"
 | 
						|
	"github.com/ipfs/go-graphsync"
 | 
						|
	"github.com/ipld/go-ipld-prime"
 | 
						|
	"github.com/libp2p/go-libp2p-core/peer"
 | 
						|
	"golang.org/x/xerrors"
 | 
						|
 | 
						|
	store "github.com/filecoin-project/lotus/chain/store"
 | 
						|
	"github.com/filecoin-project/lotus/chain/types"
 | 
						|
 | 
						|
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
 | 
						|
	basicnode "github.com/ipld/go-ipld-prime/node/basic"
 | 
						|
	ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
 | 
						|
	selectorbuilder "github.com/ipld/go-ipld-prime/traversal/selector/builder"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
 | 
						|
	// AMT selector recursion. An AMT has arity of 8 so this gives allows
 | 
						|
	// us to retrieve trees with 8^10 (1,073,741,824) elements.
 | 
						|
	amtRecursionDepth = uint32(10)
 | 
						|
 | 
						|
	// some constants for looking up tuple encoded struct fields
 | 
						|
	// field index of Parents field in a block header
 | 
						|
	blockIndexParentsField = 5
 | 
						|
 | 
						|
	// field index of Messages field in a block header
 | 
						|
	blockIndexMessagesField = 10
 | 
						|
 | 
						|
	// field index of AMT node in AMT head
 | 
						|
	amtHeadNodeFieldIndex = 2
 | 
						|
 | 
						|
	// field index of links array AMT node
 | 
						|
	amtNodeLinksFieldIndex = 1
 | 
						|
 | 
						|
	// field index of values array AMT node
 | 
						|
	amtNodeValuesFieldIndex = 2
 | 
						|
 | 
						|
	// maximum depth per traversal
 | 
						|
	maxRequestLength = 50
 | 
						|
)
 | 
						|
 | 
						|
var amtSelector selectorbuilder.SelectorSpec
 | 
						|
 | 
						|
func init() {
 | 
						|
	// builer for selectors
 | 
						|
	ssb := selectorbuilder.NewSelectorSpecBuilder(basicnode.Style.Any)
 | 
						|
	// amt selector -- needed to selector through a messages AMT
 | 
						|
	amtSelector = ssb.ExploreIndex(amtHeadNodeFieldIndex,
 | 
						|
		ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(int(amtRecursionDepth)),
 | 
						|
			ssb.ExploreUnion(
 | 
						|
				ssb.ExploreIndex(amtNodeLinksFieldIndex,
 | 
						|
					ssb.ExploreAll(ssb.ExploreRecursiveEdge())),
 | 
						|
				ssb.ExploreIndex(amtNodeValuesFieldIndex,
 | 
						|
					ssb.ExploreAll(ssb.Matcher())))))
 | 
						|
}
 | 
						|
 | 
						|
func selectorForRequest(req *BlockSyncRequest) ipld.Node {
 | 
						|
	// builer for selectors
 | 
						|
	ssb := selectorbuilder.NewSelectorSpecBuilder(basicnode.Style.Any)
 | 
						|
 | 
						|
	bso := ParseBSOptions(req.Options)
 | 
						|
	if bso.IncludeMessages {
 | 
						|
		return ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(int(req.RequestLength)),
 | 
						|
			ssb.ExploreIndex(blockIndexParentsField,
 | 
						|
				ssb.ExploreUnion(
 | 
						|
					ssb.ExploreAll(
 | 
						|
						ssb.ExploreIndex(blockIndexMessagesField,
 | 
						|
							ssb.ExploreRange(0, 2, amtSelector),
 | 
						|
						)),
 | 
						|
					ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()),
 | 
						|
				))).Node()
 | 
						|
	}
 | 
						|
	return ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(int(req.RequestLength)), ssb.ExploreIndex(blockIndexParentsField,
 | 
						|
		ssb.ExploreUnion(
 | 
						|
			ssb.ExploreAll(
 | 
						|
				ssb.Matcher(),
 | 
						|
			),
 | 
						|
			ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()),
 | 
						|
		))).Node()
 | 
						|
}
 | 
						|
 | 
						|
func firstTipsetSelector(req *BlockSyncRequest) ipld.Node {
 | 
						|
	// builer for selectors
 | 
						|
	ssb := selectorbuilder.NewSelectorSpecBuilder(basicnode.Style.Any)
 | 
						|
 | 
						|
	bso := ParseBSOptions(req.Options)
 | 
						|
	if bso.IncludeMessages {
 | 
						|
		return ssb.ExploreIndex(blockIndexMessagesField,
 | 
						|
			ssb.ExploreRange(0, 2, amtSelector),
 | 
						|
		).Node()
 | 
						|
	}
 | 
						|
	return ssb.Matcher().Node()
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
func (bs *BlockSync) executeGsyncSelector(ctx context.Context, p peer.ID, root cid.Cid, sel ipld.Node) error {
 | 
						|
	extension := graphsync.ExtensionData{
 | 
						|
		Name: "chainsync",
 | 
						|
		Data: nil,
 | 
						|
	}
 | 
						|
	_, errs := bs.gsync.Request(ctx, p, cidlink.Link{Cid: root}, sel, extension)
 | 
						|
 | 
						|
	for err := range errs {
 | 
						|
		return xerrors.Errorf("failed to complete graphsync request: %w", err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Fallback for interacting with other non-lotus nodes
 | 
						|
func (bs *BlockSync) fetchBlocksGraphSync(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
 | 
						|
	ctx, cancel := context.WithCancel(ctx)
 | 
						|
	defer cancel()
 | 
						|
 | 
						|
	immediateTsSelector := firstTipsetSelector(req)
 | 
						|
 | 
						|
	// Do this because we can only request one root at a time
 | 
						|
	for _, r := range req.Start {
 | 
						|
		if err := bs.executeGsyncSelector(ctx, p, r, immediateTsSelector); err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if req.RequestLength > maxRequestLength {
 | 
						|
		req.RequestLength = maxRequestLength
 | 
						|
	}
 | 
						|
 | 
						|
	sel := selectorForRequest(req)
 | 
						|
 | 
						|
	// execute the selector forreal
 | 
						|
	if err := bs.executeGsyncSelector(ctx, p, req.Start[0], sel); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// Now pull the data we fetched out of the chainstore (where it should now be persisted)
 | 
						|
	tempcs := store.NewChainStore(bs.bserv.Blockstore(), datastore.NewMapDatastore(), nil)
 | 
						|
 | 
						|
	opts := ParseBSOptions(req.Options)
 | 
						|
	tsk := types.NewTipSetKey(req.Start...)
 | 
						|
	chain, err := collectChainSegment(tempcs, tsk, req.RequestLength, opts)
 | 
						|
	if err != nil {
 | 
						|
		return nil, xerrors.Errorf("failed to load chain data from chainstore after successful graphsync response (start = %v): %w", req.Start, err)
 | 
						|
	}
 | 
						|
 | 
						|
	return &BlockSyncResponse{Chain: chain}, nil
 | 
						|
}
 |