diff --git a/node/modules/graphsync.go b/node/modules/graphsync.go index 36f7dfbf2..724c57ef0 100644 --- a/node/modules/graphsync.go +++ b/node/modules/graphsync.go @@ -4,15 +4,14 @@ import ( "context" "time" - "go.opencensus.io/stats" - "go.uber.org/fx" - "github.com/ipfs/go-graphsync" graphsyncimpl "github.com/ipfs/go-graphsync/impl" gsnet "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/storeutil" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "go.opencensus.io/stats" + "go.uber.org/fx" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/config" @@ -56,43 +55,47 @@ func Graphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval } }) - stopStats := make(chan struct{}) - lc.Append(fx.Hook{ - OnStart: func(context.Context) error { - go func() { - t := time.NewTicker(10 * time.Second) - for { - select { - case <-t.C: - - st := gs.Stats() - stats.Record(mctx, metrics.GraphsyncReceivingPeersCount.M(int64(st.OutgoingRequests.TotalPeers))) - stats.Record(mctx, metrics.GraphsyncReceivingActiveCount.M(int64(st.OutgoingRequests.Active))) - stats.Record(mctx, metrics.GraphsyncReceivingCountCount.M(int64(st.OutgoingRequests.Pending))) - stats.Record(mctx, metrics.GraphsyncReceivingTotalMemoryAllocated.M(int64(st.IncomingResponses.TotalAllocatedAllPeers))) - stats.Record(mctx, metrics.GraphsyncReceivingTotalPendingAllocations.M(int64(st.IncomingResponses.TotalPendingAllocations))) - stats.Record(mctx, metrics.GraphsyncReceivingPeersPending.M(int64(st.IncomingResponses.NumPeersWithPendingAllocations))) - stats.Record(mctx, metrics.GraphsyncSendingPeersCount.M(int64(st.IncomingRequests.TotalPeers))) - stats.Record(mctx, metrics.GraphsyncSendingActiveCount.M(int64(st.IncomingRequests.Active))) - stats.Record(mctx, metrics.GraphsyncSendingCountCount.M(int64(st.IncomingRequests.Pending))) - stats.Record(mctx, metrics.GraphsyncSendingTotalMemoryAllocated.M(int64(st.OutgoingResponses.TotalAllocatedAllPeers))) - stats.Record(mctx, metrics.GraphsyncSendingTotalPendingAllocations.M(int64(st.OutgoingResponses.TotalPendingAllocations))) - stats.Record(mctx, metrics.GraphsyncSendingPeersPending.M(int64(st.OutgoingResponses.NumPeersWithPendingAllocations))) - - case <-stopStats: - return - } - } - }() - - return nil - }, - OnStop: func(ctx context.Context) error { - close(stopStats) - return nil - }, - }) + graphsyncStats(mctx, lc, gs) return gs, nil } } + +func graphsyncStats(mctx helpers.MetricsCtx, lc fx.Lifecycle, gs dtypes.Graphsync) { + stopStats := make(chan struct{}) + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + go func() { + t := time.NewTicker(10 * time.Second) + for { + select { + case <-t.C: + + st := gs.Stats() + stats.Record(mctx, metrics.GraphsyncReceivingPeersCount.M(int64(st.OutgoingRequests.TotalPeers))) + stats.Record(mctx, metrics.GraphsyncReceivingActiveCount.M(int64(st.OutgoingRequests.Active))) + stats.Record(mctx, metrics.GraphsyncReceivingCountCount.M(int64(st.OutgoingRequests.Pending))) + stats.Record(mctx, metrics.GraphsyncReceivingTotalMemoryAllocated.M(int64(st.IncomingResponses.TotalAllocatedAllPeers))) + stats.Record(mctx, metrics.GraphsyncReceivingTotalPendingAllocations.M(int64(st.IncomingResponses.TotalPendingAllocations))) + stats.Record(mctx, metrics.GraphsyncReceivingPeersPending.M(int64(st.IncomingResponses.NumPeersWithPendingAllocations))) + stats.Record(mctx, metrics.GraphsyncSendingPeersCount.M(int64(st.IncomingRequests.TotalPeers))) + stats.Record(mctx, metrics.GraphsyncSendingActiveCount.M(int64(st.IncomingRequests.Active))) + stats.Record(mctx, metrics.GraphsyncSendingCountCount.M(int64(st.IncomingRequests.Pending))) + stats.Record(mctx, metrics.GraphsyncSendingTotalMemoryAllocated.M(int64(st.OutgoingResponses.TotalAllocatedAllPeers))) + stats.Record(mctx, metrics.GraphsyncSendingTotalPendingAllocations.M(int64(st.OutgoingResponses.TotalPendingAllocations))) + stats.Record(mctx, metrics.GraphsyncSendingPeersPending.M(int64(st.OutgoingResponses.NumPeersWithPendingAllocations))) + + case <-stopStats: + return + } + } + }() + + return nil + }, + OnStop: func(ctx context.Context) error { + close(stopStats) + return nil + }, + }) +} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index b32cbe9e0..1a2dfc19f 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -408,6 +408,8 @@ func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRe graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks), graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks)) + graphsyncStats(mctx, lc, gs) + return gs } }