package modules import ( "context" "time" "github.com/ipfs/go-graphsync" graphsyncimpl "github.com/ipfs/go-graphsync/impl" gsnet "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/storeutil" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "go.opencensus.io/stats" "go.uber.org/fx" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" ) // Graphsync creates a graphsync instance from the given loader and storer func Graphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) { graphsyncNetwork := gsnet.NewFromLibp2pHost(h) lsys := storeutil.LinkSystemForBlockstore(clientBs) gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, lsys, graphsyncimpl.RejectAllRequestsByDefault(), graphsyncimpl.MaxInProgressIncomingRequests(parallelTransfersForStorage), graphsyncimpl.MaxInProgressOutgoingRequests(parallelTransfersForRetrieval), graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks), graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks)) chainLinkSystem := storeutil.LinkSystemForBlockstore(chainBs) err := gs.RegisterPersistenceOption("chainstore", chainLinkSystem) if err != nil { return nil, err } gs.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { _, has := requestData.Extension("chainsync") if has { // TODO: we should confirm the selector is a reasonable one before we validate // TODO: this code will get more complicated and should probably not live here eventually hookActions.ValidateRequest() hookActions.UsePersistenceOption("chainstore") } }) gs.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) { _, has := requestData.Extension("chainsync") if has { hookActions.UsePersistenceOption("chainstore") } }) graphsyncStats(mctx, lc, gs) return gs, nil } } func graphsyncStats(mctx helpers.MetricsCtx, lc fx.Lifecycle, gs dtypes.Graphsync) { stopStats := make(chan struct{}) lc.Append(fx.Hook{ OnStart: func(context.Context) error { go func() { t := time.NewTicker(10 * time.Second) for { select { case <-t.C: st := gs.Stats() stats.Record(mctx, metrics.GraphsyncReceivingPeersCount.M(int64(st.OutgoingRequests.TotalPeers))) stats.Record(mctx, metrics.GraphsyncReceivingActiveCount.M(int64(st.OutgoingRequests.Active))) stats.Record(mctx, metrics.GraphsyncReceivingCountCount.M(int64(st.OutgoingRequests.Pending))) stats.Record(mctx, metrics.GraphsyncReceivingTotalMemoryAllocated.M(int64(st.IncomingResponses.TotalAllocatedAllPeers))) stats.Record(mctx, metrics.GraphsyncReceivingTotalPendingAllocations.M(int64(st.IncomingResponses.TotalPendingAllocations))) stats.Record(mctx, metrics.GraphsyncReceivingPeersPending.M(int64(st.IncomingResponses.NumPeersWithPendingAllocations))) stats.Record(mctx, metrics.GraphsyncSendingPeersCount.M(int64(st.IncomingRequests.TotalPeers))) stats.Record(mctx, metrics.GraphsyncSendingActiveCount.M(int64(st.IncomingRequests.Active))) stats.Record(mctx, metrics.GraphsyncSendingCountCount.M(int64(st.IncomingRequests.Pending))) stats.Record(mctx, metrics.GraphsyncSendingTotalMemoryAllocated.M(int64(st.OutgoingResponses.TotalAllocatedAllPeers))) stats.Record(mctx, metrics.GraphsyncSendingTotalPendingAllocations.M(int64(st.OutgoingResponses.TotalPendingAllocations))) stats.Record(mctx, metrics.GraphsyncSendingPeersPending.M(int64(st.OutgoingResponses.NumPeersWithPendingAllocations))) case <-stopStats: return } } }() return nil }, OnStop: func(ctx context.Context) error { close(stopStats) return nil }, }) }