Collect and expose graphsync metrics

This commit is contained in:
Łukasz Magiera 2021-10-19 19:19:52 +02:00
parent b1c06ece85
commit 9a993d25d0
2 changed files with 121 additions and 0 deletions

View File

@ -55,6 +55,22 @@ var (
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds) APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds)
// graphsync
GraphsyncReceivingPeersCount = stats.Int64("graphsync/receiving_peers", "number of peers we are receiving graphsync data from", stats.UnitDimensionless)
GraphsyncReceivingActiveCount = stats.Int64("graphsync/receiving_active", "number of active receiving graphsync transfers", stats.UnitDimensionless)
GraphsyncReceivingCountCount = stats.Int64("graphsync/receiving_pending", "number of pending receiving graphsync transfers", stats.UnitDimensionless)
GraphsyncReceivingTotalMemoryAllocated = stats.Int64("graphsync/receiving_total_allocated", "amount of block memory allocated for receiving graphsync data", stats.UnitDimensionless)
GraphsyncReceivingTotalPendingAllocations = stats.Int64("graphsync/receiving_pending_allocations", "amount of block memory on hold being received pending allocation", stats.UnitDimensionless)
GraphsyncReceivingPeersPending = stats.Int64("graphsync/receiving_peers_pending", "number of peers we can't receive more data from cause of pending allocations", stats.UnitDimensionless)
GraphsyncSendingPeersCount = stats.Int64("graphsync/sending_peers", "number of peers we are sending graphsync data to", stats.UnitDimensionless)
GraphsyncSendingActiveCount = stats.Int64("graphsync/sending_active", "number of active sending graphsync transfers", stats.UnitDimensionless)
GraphsyncSendingCountCount = stats.Int64("graphsync/sending_pending", "number of pending sending graphsync transfers", stats.UnitDimensionless)
GraphsyncSendingTotalMemoryAllocated = stats.Int64("graphsync/sending_total_allocated", "amount of block memory allocated for sending graphsync data", stats.UnitDimensionless)
GraphsyncSendingTotalPendingAllocations = stats.Int64("graphsync/sending_pending_allocations", "amount of block memory on hold from sending pending allocation", stats.UnitDimensionless)
GraphsyncSendingPeersPending = stats.Int64("graphsync/sending_peers_pending", "number of peers we can't send more data to cause of pending allocations", stats.UnitDimensionless)
// chain // chain
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless) ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless)
@ -380,6 +396,56 @@ var (
Measure: SplitstoreCompactionDead, Measure: SplitstoreCompactionDead,
Aggregation: view.Sum(), Aggregation: view.Sum(),
} }
// graphsync
GraphsyncReceivingPeersCountView = &view.View{
Measure: GraphsyncReceivingPeersCount,
Aggregation: view.LastValue(),
}
GraphsyncReceivingActiveCountView = &view.View{
Measure: GraphsyncReceivingActiveCount,
Aggregation: view.LastValue(),
}
GraphsyncReceivingCountCountView = &view.View{
Measure: GraphsyncReceivingCountCount,
Aggregation: view.LastValue(),
}
GraphsyncReceivingTotalMemoryAllocatedView = &view.View{
Measure: GraphsyncReceivingTotalMemoryAllocated,
Aggregation: view.LastValue(),
}
GraphsyncReceivingTotalPendingAllocationsView = &view.View{
Measure: GraphsyncReceivingTotalPendingAllocations,
Aggregation: view.LastValue(),
}
GraphsyncReceivingPeersPendingView = &view.View{
Measure: GraphsyncReceivingPeersPending,
Aggregation: view.LastValue(),
}
GraphsyncSendingPeersCountView = &view.View{
Measure: GraphsyncSendingPeersCount,
Aggregation: view.LastValue(),
}
GraphsyncSendingActiveCountView = &view.View{
Measure: GraphsyncSendingActiveCount,
Aggregation: view.LastValue(),
}
GraphsyncSendingCountCountView = &view.View{
Measure: GraphsyncSendingCountCount,
Aggregation: view.LastValue(),
}
GraphsyncSendingTotalMemoryAllocatedView = &view.View{
Measure: GraphsyncSendingTotalMemoryAllocated,
Aggregation: view.LastValue(),
}
GraphsyncSendingTotalPendingAllocationsView = &view.View{
Measure: GraphsyncSendingTotalPendingAllocations,
Aggregation: view.LastValue(),
}
GraphsyncSendingPeersPendingView = &view.View{
Measure: GraphsyncSendingPeersPending,
Aggregation: view.LastValue(),
}
) )
// DefaultViews is an array of OpenCensus views for metric gathering purposes // DefaultViews is an array of OpenCensus views for metric gathering purposes
@ -388,6 +454,19 @@ var DefaultViews = func() []*view.View {
InfoView, InfoView,
PeerCountView, PeerCountView,
APIRequestDurationView, APIRequestDurationView,
GraphsyncReceivingPeersCountView,
GraphsyncReceivingActiveCountView,
GraphsyncReceivingCountCountView,
GraphsyncReceivingTotalMemoryAllocatedView,
GraphsyncReceivingTotalPendingAllocationsView,
GraphsyncReceivingPeersPendingView,
GraphsyncSendingPeersCountView,
GraphsyncSendingActiveCountView,
GraphsyncSendingCountCountView,
GraphsyncSendingTotalMemoryAllocatedView,
GraphsyncSendingTotalPendingAllocationsView,
GraphsyncSendingPeersPendingView,
} }
views = append(views, blockstore.DefaultViews...) views = append(views, blockstore.DefaultViews...)
views = append(views, rpcmetrics.DefaultViews...) views = append(views, rpcmetrics.DefaultViews...)

View File

@ -1,13 +1,17 @@
package modules package modules
import ( import (
"context"
"github.com/filecoin-project/lotus/metrics"
"github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync"
graphsyncimpl "github.com/ipfs/go-graphsync/impl" graphsyncimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network" gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil" "github.com/ipfs/go-graphsync/storeutil"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"go.opencensus.io/stats"
"go.uber.org/fx" "go.uber.org/fx"
"time"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
@ -49,6 +53,44 @@ func Graphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval
hookActions.UsePersistenceOption("chainstore") hookActions.UsePersistenceOption("chainstore")
} }
}) })
stopStats := make(chan struct{})
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go func() {
t := time.NewTicker(10 * time.Second)
for {
select {
case <-t.C:
st := gs.Stats()
stats.Record(mctx, metrics.GraphsyncReceivingPeersCount.M(int64(st.OutgoingRequests.TotalPeers)))
stats.Record(mctx, metrics.GraphsyncReceivingActiveCount.M(int64(st.OutgoingRequests.Active)))
stats.Record(mctx, metrics.GraphsyncReceivingCountCount.M(int64(st.OutgoingRequests.Pending)))
stats.Record(mctx, metrics.GraphsyncReceivingTotalMemoryAllocated.M(int64(st.IncomingResponses.TotalAllocatedAllPeers)))
stats.Record(mctx, metrics.GraphsyncReceivingTotalPendingAllocations.M(int64(st.IncomingResponses.TotalPendingAllocations)))
stats.Record(mctx, metrics.GraphsyncReceivingPeersPending.M(int64(st.IncomingResponses.NumPeersWithPendingAllocations)))
stats.Record(mctx, metrics.GraphsyncSendingPeersCount.M(int64(st.IncomingRequests.TotalPeers)))
stats.Record(mctx, metrics.GraphsyncSendingActiveCount.M(int64(st.IncomingRequests.Active)))
stats.Record(mctx, metrics.GraphsyncSendingCountCount.M(int64(st.IncomingRequests.Pending)))
stats.Record(mctx, metrics.GraphsyncSendingTotalMemoryAllocated.M(int64(st.OutgoingResponses.TotalAllocatedAllPeers)))
stats.Record(mctx, metrics.GraphsyncSendingTotalPendingAllocations.M(int64(st.OutgoingResponses.TotalPendingAllocations)))
stats.Record(mctx, metrics.GraphsyncSendingPeersPending.M(int64(st.OutgoingResponses.NumPeersWithPendingAllocations)))
case <-stopStats:
return
}
}
}()
return nil
},
OnStop: func(ctx context.Context) error {
close(stopStats)
return nil
},
})
return gs, nil return gs, nil
} }
} }