102 lines
4.4 KiB
Go
102 lines
4.4 KiB
Go
package modules
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-graphsync"
|
|
graphsyncimpl "github.com/ipfs/go-graphsync/impl"
|
|
gsnet "github.com/ipfs/go-graphsync/network"
|
|
"github.com/ipfs/go-graphsync/storeutil"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"go.opencensus.io/stats"
|
|
"go.uber.org/fx"
|
|
|
|
"github.com/filecoin-project/lotus/metrics"
|
|
"github.com/filecoin-project/lotus/node/config"
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
|
"github.com/filecoin-project/lotus/node/repo"
|
|
)
|
|
|
|
// Graphsync creates a graphsync instance from the given loader and storer
|
|
func Graphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) {
|
|
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) {
|
|
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
|
|
lsys := storeutil.LinkSystemForBlockstore(clientBs)
|
|
|
|
gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc),
|
|
graphsyncNetwork,
|
|
lsys,
|
|
graphsyncimpl.RejectAllRequestsByDefault(),
|
|
graphsyncimpl.MaxInProgressIncomingRequests(parallelTransfersForStorage),
|
|
graphsyncimpl.MaxInProgressOutgoingRequests(parallelTransfersForRetrieval),
|
|
graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks),
|
|
graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks))
|
|
chainLinkSystem := storeutil.LinkSystemForBlockstore(chainBs)
|
|
err := gs.RegisterPersistenceOption("chainstore", chainLinkSystem)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
gs.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
|
|
_, has := requestData.Extension("chainsync")
|
|
if has {
|
|
// TODO: we should confirm the selector is a reasonable one before we validate
|
|
// TODO: this code will get more complicated and should probably not live here eventually
|
|
hookActions.ValidateRequest()
|
|
hookActions.UsePersistenceOption("chainstore")
|
|
}
|
|
})
|
|
gs.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
|
|
_, has := requestData.Extension("chainsync")
|
|
if has {
|
|
hookActions.UsePersistenceOption("chainstore")
|
|
}
|
|
})
|
|
|
|
graphsyncStats(mctx, lc, gs)
|
|
|
|
return gs, nil
|
|
}
|
|
}
|
|
|
|
func graphsyncStats(mctx helpers.MetricsCtx, lc fx.Lifecycle, gs dtypes.Graphsync) {
|
|
stopStats := make(chan struct{})
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(context.Context) error {
|
|
go func() {
|
|
t := time.NewTicker(10 * time.Second)
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
|
|
st := gs.Stats()
|
|
stats.Record(mctx, metrics.GraphsyncReceivingPeersCount.M(int64(st.OutgoingRequests.TotalPeers)))
|
|
stats.Record(mctx, metrics.GraphsyncReceivingActiveCount.M(int64(st.OutgoingRequests.Active)))
|
|
stats.Record(mctx, metrics.GraphsyncReceivingCountCount.M(int64(st.OutgoingRequests.Pending)))
|
|
stats.Record(mctx, metrics.GraphsyncReceivingTotalMemoryAllocated.M(int64(st.IncomingResponses.TotalAllocatedAllPeers)))
|
|
stats.Record(mctx, metrics.GraphsyncReceivingTotalPendingAllocations.M(int64(st.IncomingResponses.TotalPendingAllocations)))
|
|
stats.Record(mctx, metrics.GraphsyncReceivingPeersPending.M(int64(st.IncomingResponses.NumPeersWithPendingAllocations)))
|
|
stats.Record(mctx, metrics.GraphsyncSendingPeersCount.M(int64(st.IncomingRequests.TotalPeers)))
|
|
stats.Record(mctx, metrics.GraphsyncSendingActiveCount.M(int64(st.IncomingRequests.Active)))
|
|
stats.Record(mctx, metrics.GraphsyncSendingCountCount.M(int64(st.IncomingRequests.Pending)))
|
|
stats.Record(mctx, metrics.GraphsyncSendingTotalMemoryAllocated.M(int64(st.OutgoingResponses.TotalAllocatedAllPeers)))
|
|
stats.Record(mctx, metrics.GraphsyncSendingTotalPendingAllocations.M(int64(st.OutgoingResponses.TotalPendingAllocations)))
|
|
stats.Record(mctx, metrics.GraphsyncSendingPeersPending.M(int64(st.OutgoingResponses.NumPeersWithPendingAllocations)))
|
|
|
|
case <-stopStats:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
close(stopStats)
|
|
return nil
|
|
},
|
|
})
|
|
}
|