diff --git a/go.mod b/go.mod index 703c0dcf3..6fefa376a 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ require ( github.com/ipfs/go-ds-measure v0.1.0 github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459 github.com/ipfs/go-fs-lock v0.0.6 - github.com/ipfs/go-graphsync v0.10.1 + github.com/ipfs/go-graphsync v0.10.4 github.com/ipfs/go-ipfs-blockstore v1.0.4 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index 8afaf641f..440ed0c9a 100644 --- a/go.sum +++ b/go.sum @@ -743,8 +743,8 @@ github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CE github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0= github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY= github.com/ipfs/go-graphsync v0.10.0/go.mod h1:cKIshzTaa5rCZjryH5xmSKZVGX9uk1wvwGvz2WEha5Y= -github.com/ipfs/go-graphsync v0.10.1 h1:m6nNwiRFE2FVBTCxHWVTRApjH0snIjFy7fkDbOlMa/I= -github.com/ipfs/go-graphsync v0.10.1/go.mod h1:cKIshzTaa5rCZjryH5xmSKZVGX9uk1wvwGvz2WEha5Y= +github.com/ipfs/go-graphsync v0.10.4 h1:1WZhyOPxgxLvHTIC2GoLltaBrjZ+JuXC2oKAEiX8f3Y= +github.com/ipfs/go-graphsync v0.10.4/go.mod h1:oei4tnWAKnZ6LPnapZGPYVVbyiKV1UP3f8BeLU7Z4JQ= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= @@ -842,8 +842,9 @@ github.com/ipfs/go-path v0.0.7/go.mod h1:6KTKmeRnBXgqrTvzFrPV3CamxcgvXX/4z79tfAd github.com/ipfs/go-peertaskqueue v0.0.4/go.mod h1:03H8fhyeMfKNFWqzYEVyMbcPUeYrqP1MX6Kd+aN+rMQ= github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U= github.com/ipfs/go-peertaskqueue v0.1.1/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U= -github.com/ipfs/go-peertaskqueue v0.2.0 h1:2cSr7exUGKYyDeUyQ7P/nHPs9P7Ht/B+ROrpN1EJOjc= github.com/ipfs/go-peertaskqueue v0.2.0/go.mod h1:5/eNrBEbtSKWCG+kQK8K8fGNixoYUnr+P7jivavs9lY= +github.com/ipfs/go-peertaskqueue v0.6.0 h1:BT1/PuNViVomiz1PnnP5+WmKsTNHrxIDvkZrkj4JhOg= +github.com/ipfs/go-peertaskqueue v0.6.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU= github.com/ipfs/go-todocounter v0.0.1/go.mod h1:l5aErvQc8qKE2r7NDMjmq5UNAvuZy0rC8BHOplkWvZ4= github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k= github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw= diff --git a/metrics/metrics.go b/metrics/metrics.go index fd538839d..56e2e77f5 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -55,6 +55,22 @@ var ( PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds) + // graphsync + + GraphsyncReceivingPeersCount = stats.Int64("graphsync/receiving_peers", "number of peers we are receiving graphsync data from", stats.UnitDimensionless) + GraphsyncReceivingActiveCount = stats.Int64("graphsync/receiving_active", "number of active receiving graphsync transfers", stats.UnitDimensionless) + GraphsyncReceivingCountCount = stats.Int64("graphsync/receiving_pending", "number of pending receiving graphsync transfers", stats.UnitDimensionless) + GraphsyncReceivingTotalMemoryAllocated = stats.Int64("graphsync/receiving_total_allocated", "amount of block memory allocated for receiving graphsync data", stats.UnitBytes) + GraphsyncReceivingTotalPendingAllocations = stats.Int64("graphsync/receiving_pending_allocations", "amount of block memory on hold being received pending allocation", stats.UnitBytes) + GraphsyncReceivingPeersPending = stats.Int64("graphsync/receiving_peers_pending", "number of peers we can't receive more data from cause of pending allocations", stats.UnitDimensionless) + + GraphsyncSendingPeersCount = stats.Int64("graphsync/sending_peers", "number of peers we are sending graphsync data to", stats.UnitDimensionless) + GraphsyncSendingActiveCount = stats.Int64("graphsync/sending_active", "number of active sending graphsync transfers", stats.UnitDimensionless) + GraphsyncSendingCountCount = stats.Int64("graphsync/sending_pending", "number of pending sending graphsync transfers", stats.UnitDimensionless) + GraphsyncSendingTotalMemoryAllocated = stats.Int64("graphsync/sending_total_allocated", "amount of block memory allocated for sending graphsync data", stats.UnitBytes) + GraphsyncSendingTotalPendingAllocations = stats.Int64("graphsync/sending_pending_allocations", "amount of block memory on hold from sending pending allocation", stats.UnitBytes) + GraphsyncSendingPeersPending = stats.Int64("graphsync/sending_peers_pending", "number of peers we can't send more data to cause of pending allocations", stats.UnitDimensionless) + // chain ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless) @@ -380,6 +396,56 @@ var ( Measure: SplitstoreCompactionDead, Aggregation: view.Sum(), } + + // graphsync + GraphsyncReceivingPeersCountView = &view.View{ + Measure: GraphsyncReceivingPeersCount, + Aggregation: view.LastValue(), + } + GraphsyncReceivingActiveCountView = &view.View{ + Measure: GraphsyncReceivingActiveCount, + Aggregation: view.LastValue(), + } + GraphsyncReceivingCountCountView = &view.View{ + Measure: GraphsyncReceivingCountCount, + Aggregation: view.LastValue(), + } + GraphsyncReceivingTotalMemoryAllocatedView = &view.View{ + Measure: GraphsyncReceivingTotalMemoryAllocated, + Aggregation: view.LastValue(), + } + GraphsyncReceivingTotalPendingAllocationsView = &view.View{ + Measure: GraphsyncReceivingTotalPendingAllocations, + Aggregation: view.LastValue(), + } + GraphsyncReceivingPeersPendingView = &view.View{ + Measure: GraphsyncReceivingPeersPending, + Aggregation: view.LastValue(), + } + GraphsyncSendingPeersCountView = &view.View{ + Measure: GraphsyncSendingPeersCount, + Aggregation: view.LastValue(), + } + GraphsyncSendingActiveCountView = &view.View{ + Measure: GraphsyncSendingActiveCount, + Aggregation: view.LastValue(), + } + GraphsyncSendingCountCountView = &view.View{ + Measure: GraphsyncSendingCountCount, + Aggregation: view.LastValue(), + } + GraphsyncSendingTotalMemoryAllocatedView = &view.View{ + Measure: GraphsyncSendingTotalMemoryAllocated, + Aggregation: view.LastValue(), + } + GraphsyncSendingTotalPendingAllocationsView = &view.View{ + Measure: GraphsyncSendingTotalPendingAllocations, + Aggregation: view.LastValue(), + } + GraphsyncSendingPeersPendingView = &view.View{ + Measure: GraphsyncSendingPeersPending, + Aggregation: view.LastValue(), + } ) // DefaultViews is an array of OpenCensus views for metric gathering purposes @@ -388,6 +454,19 @@ var DefaultViews = func() []*view.View { InfoView, PeerCountView, APIRequestDurationView, + + GraphsyncReceivingPeersCountView, + GraphsyncReceivingActiveCountView, + GraphsyncReceivingCountCountView, + GraphsyncReceivingTotalMemoryAllocatedView, + GraphsyncReceivingTotalPendingAllocationsView, + GraphsyncReceivingPeersPendingView, + GraphsyncSendingPeersCountView, + GraphsyncSendingActiveCountView, + GraphsyncSendingCountCountView, + GraphsyncSendingTotalMemoryAllocatedView, + GraphsyncSendingTotalPendingAllocationsView, + GraphsyncSendingPeersPendingView, } views = append(views, blockstore.DefaultViews...) views = append(views, rpcmetrics.DefaultViews...) diff --git a/node/modules/graphsync.go b/node/modules/graphsync.go index 839508900..724c57ef0 100644 --- a/node/modules/graphsync.go +++ b/node/modules/graphsync.go @@ -1,14 +1,19 @@ package modules import ( + "context" + "time" + "github.com/ipfs/go-graphsync" graphsyncimpl "github.com/ipfs/go-graphsync/impl" gsnet "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/storeutil" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "go.opencensus.io/stats" "go.uber.org/fx" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" @@ -49,6 +54,48 @@ func Graphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval hookActions.UsePersistenceOption("chainstore") } }) + + graphsyncStats(mctx, lc, gs) + return gs, nil } } + +func graphsyncStats(mctx helpers.MetricsCtx, lc fx.Lifecycle, gs dtypes.Graphsync) { + stopStats := make(chan struct{}) + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + go func() { + t := time.NewTicker(10 * time.Second) + for { + select { + case <-t.C: + + st := gs.Stats() + stats.Record(mctx, metrics.GraphsyncReceivingPeersCount.M(int64(st.OutgoingRequests.TotalPeers))) + stats.Record(mctx, metrics.GraphsyncReceivingActiveCount.M(int64(st.OutgoingRequests.Active))) + stats.Record(mctx, metrics.GraphsyncReceivingCountCount.M(int64(st.OutgoingRequests.Pending))) + stats.Record(mctx, metrics.GraphsyncReceivingTotalMemoryAllocated.M(int64(st.IncomingResponses.TotalAllocatedAllPeers))) + stats.Record(mctx, metrics.GraphsyncReceivingTotalPendingAllocations.M(int64(st.IncomingResponses.TotalPendingAllocations))) + stats.Record(mctx, metrics.GraphsyncReceivingPeersPending.M(int64(st.IncomingResponses.NumPeersWithPendingAllocations))) + stats.Record(mctx, metrics.GraphsyncSendingPeersCount.M(int64(st.IncomingRequests.TotalPeers))) + stats.Record(mctx, metrics.GraphsyncSendingActiveCount.M(int64(st.IncomingRequests.Active))) + stats.Record(mctx, metrics.GraphsyncSendingCountCount.M(int64(st.IncomingRequests.Pending))) + stats.Record(mctx, metrics.GraphsyncSendingTotalMemoryAllocated.M(int64(st.OutgoingResponses.TotalAllocatedAllPeers))) + stats.Record(mctx, metrics.GraphsyncSendingTotalPendingAllocations.M(int64(st.OutgoingResponses.TotalPendingAllocations))) + stats.Record(mctx, metrics.GraphsyncSendingPeersPending.M(int64(st.OutgoingResponses.NumPeersWithPendingAllocations))) + + case <-stopStats: + return + } + } + }() + + return nil + }, + OnStop: func(ctx context.Context) error { + close(stopStats) + return nil + }, + }) +} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index b32cbe9e0..1a2dfc19f 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -408,6 +408,8 @@ func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRe graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks), graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks)) + graphsyncStats(mctx, lc, gs) + return gs } }