diff --git a/metrics/metrics.go b/metrics/metrics.go index fd538839d..5dcc248e6 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -55,6 +55,22 @@ var ( PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds) + // graphsync + + GraphsyncReceivingPeersCount = stats.Int64("graphsync/receiving_peers", "number of peers we are receiving graphsync data from", stats.UnitDimensionless) + GraphsyncReceivingActiveCount = stats.Int64("graphsync/receiving_active", "number of active receiving graphsync transfers", stats.UnitDimensionless) + GraphsyncReceivingCountCount = stats.Int64("graphsync/receiving_pending", "number of pending receiving graphsync transfers", stats.UnitDimensionless) + GraphsyncReceivingTotalMemoryAllocated = stats.Int64("graphsync/receiving_total_allocated", "amount of block memory allocated for receiving graphsync data", stats.UnitDimensionless) + GraphsyncReceivingTotalPendingAllocations = stats.Int64("graphsync/receiving_pending_allocations", "amount of block memory on hold being received pending allocation", stats.UnitDimensionless) + GraphsyncReceivingPeersPending = stats.Int64("graphsync/receiving_peers_pending", "number of peers we can't receive more data from cause of pending allocations", stats.UnitDimensionless) + + GraphsyncSendingPeersCount = stats.Int64("graphsync/sending_peers", "number of peers we are sending graphsync data to", stats.UnitDimensionless) + GraphsyncSendingActiveCount = stats.Int64("graphsync/sending_active", "number of active sending graphsync transfers", stats.UnitDimensionless) + GraphsyncSendingCountCount = stats.Int64("graphsync/sending_pending", "number of pending sending graphsync transfers", stats.UnitDimensionless) + GraphsyncSendingTotalMemoryAllocated = stats.Int64("graphsync/sending_total_allocated", "amount of block memory allocated for sending graphsync data", stats.UnitDimensionless) + GraphsyncSendingTotalPendingAllocations = stats.Int64("graphsync/sending_pending_allocations", "amount of block memory on hold from sending pending allocation", stats.UnitDimensionless) + GraphsyncSendingPeersPending = stats.Int64("graphsync/sending_peers_pending", "number of peers we can't send more data to cause of pending allocations", stats.UnitDimensionless) + // chain ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless) @@ -380,6 +396,56 @@ var ( Measure: SplitstoreCompactionDead, Aggregation: view.Sum(), } + + // graphsync + GraphsyncReceivingPeersCountView = &view.View{ + Measure: GraphsyncReceivingPeersCount, + Aggregation: view.LastValue(), + } + GraphsyncReceivingActiveCountView = &view.View{ + Measure: GraphsyncReceivingActiveCount, + Aggregation: view.LastValue(), + } + GraphsyncReceivingCountCountView = &view.View{ + Measure: GraphsyncReceivingCountCount, + Aggregation: view.LastValue(), + } + GraphsyncReceivingTotalMemoryAllocatedView = &view.View{ + Measure: GraphsyncReceivingTotalMemoryAllocated, + Aggregation: view.LastValue(), + } + GraphsyncReceivingTotalPendingAllocationsView = &view.View{ + Measure: GraphsyncReceivingTotalPendingAllocations, + Aggregation: view.LastValue(), + } + GraphsyncReceivingPeersPendingView = &view.View{ + Measure: GraphsyncReceivingPeersPending, + Aggregation: view.LastValue(), + } + GraphsyncSendingPeersCountView = &view.View{ + Measure: GraphsyncSendingPeersCount, + Aggregation: view.LastValue(), + } + GraphsyncSendingActiveCountView = &view.View{ + Measure: GraphsyncSendingActiveCount, + Aggregation: view.LastValue(), + } + GraphsyncSendingCountCountView = &view.View{ + Measure: GraphsyncSendingCountCount, + Aggregation: view.LastValue(), + } + GraphsyncSendingTotalMemoryAllocatedView = &view.View{ + Measure: GraphsyncSendingTotalMemoryAllocated, + Aggregation: view.LastValue(), + } + GraphsyncSendingTotalPendingAllocationsView = &view.View{ + Measure: GraphsyncSendingTotalPendingAllocations, + Aggregation: view.LastValue(), + } + GraphsyncSendingPeersPendingView = &view.View{ + Measure: GraphsyncSendingPeersPending, + Aggregation: view.LastValue(), + } ) // DefaultViews is an array of OpenCensus views for metric gathering purposes @@ -388,6 +454,19 @@ var DefaultViews = func() []*view.View { InfoView, PeerCountView, APIRequestDurationView, + + GraphsyncReceivingPeersCountView, + GraphsyncReceivingActiveCountView, + GraphsyncReceivingCountCountView, + GraphsyncReceivingTotalMemoryAllocatedView, + GraphsyncReceivingTotalPendingAllocationsView, + GraphsyncReceivingPeersPendingView, + GraphsyncSendingPeersCountView, + GraphsyncSendingActiveCountView, + GraphsyncSendingCountCountView, + GraphsyncSendingTotalMemoryAllocatedView, + GraphsyncSendingTotalPendingAllocationsView, + GraphsyncSendingPeersPendingView, } views = append(views, blockstore.DefaultViews...) views = append(views, rpcmetrics.DefaultViews...) diff --git a/node/modules/graphsync.go b/node/modules/graphsync.go index 839508900..079bc7ba5 100644 --- a/node/modules/graphsync.go +++ b/node/modules/graphsync.go @@ -1,13 +1,17 @@ package modules import ( + "context" + "github.com/filecoin-project/lotus/metrics" "github.com/ipfs/go-graphsync" graphsyncimpl "github.com/ipfs/go-graphsync/impl" gsnet "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/storeutil" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "go.opencensus.io/stats" "go.uber.org/fx" + "time" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -49,6 +53,44 @@ func Graphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval hookActions.UsePersistenceOption("chainstore") } }) + + stopStats := make(chan struct{}) + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + go func() { + t := time.NewTicker(10 * time.Second) + for { + select { + case <-t.C: + + st := gs.Stats() + stats.Record(mctx, metrics.GraphsyncReceivingPeersCount.M(int64(st.OutgoingRequests.TotalPeers))) + stats.Record(mctx, metrics.GraphsyncReceivingActiveCount.M(int64(st.OutgoingRequests.Active))) + stats.Record(mctx, metrics.GraphsyncReceivingCountCount.M(int64(st.OutgoingRequests.Pending))) + stats.Record(mctx, metrics.GraphsyncReceivingTotalMemoryAllocated.M(int64(st.IncomingResponses.TotalAllocatedAllPeers))) + stats.Record(mctx, metrics.GraphsyncReceivingTotalPendingAllocations.M(int64(st.IncomingResponses.TotalPendingAllocations))) + stats.Record(mctx, metrics.GraphsyncReceivingPeersPending.M(int64(st.IncomingResponses.NumPeersWithPendingAllocations))) + stats.Record(mctx, metrics.GraphsyncSendingPeersCount.M(int64(st.IncomingRequests.TotalPeers))) + stats.Record(mctx, metrics.GraphsyncSendingActiveCount.M(int64(st.IncomingRequests.Active))) + stats.Record(mctx, metrics.GraphsyncSendingCountCount.M(int64(st.IncomingRequests.Pending))) + stats.Record(mctx, metrics.GraphsyncSendingTotalMemoryAllocated.M(int64(st.OutgoingResponses.TotalAllocatedAllPeers))) + stats.Record(mctx, metrics.GraphsyncSendingTotalPendingAllocations.M(int64(st.OutgoingResponses.TotalPendingAllocations))) + stats.Record(mctx, metrics.GraphsyncSendingPeersPending.M(int64(st.OutgoingResponses.NumPeersWithPendingAllocations))) + + case <-stopStats: + return + } + } + }() + + return nil + }, + OnStop: func(ctx context.Context) error { + close(stopStats) + return nil + }, + }) + return gs, nil } }