Gather graphsync metrics on provider side as well
This commit is contained in:
parent
32a855b984
commit
d2e9d21031
@ -4,15 +4,14 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.opencensus.io/stats"
|
|
||||||
"go.uber.org/fx"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-graphsync"
|
"github.com/ipfs/go-graphsync"
|
||||||
graphsyncimpl "github.com/ipfs/go-graphsync/impl"
|
graphsyncimpl "github.com/ipfs/go-graphsync/impl"
|
||||||
gsnet "github.com/ipfs/go-graphsync/network"
|
gsnet "github.com/ipfs/go-graphsync/network"
|
||||||
"github.com/ipfs/go-graphsync/storeutil"
|
"github.com/ipfs/go-graphsync/storeutil"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
"go.opencensus.io/stats"
|
||||||
|
"go.uber.org/fx"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/metrics"
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
"github.com/filecoin-project/lotus/node/config"
|
"github.com/filecoin-project/lotus/node/config"
|
||||||
@ -56,43 +55,47 @@ func Graphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
stopStats := make(chan struct{})
|
graphsyncStats(mctx, lc, gs)
|
||||||
lc.Append(fx.Hook{
|
|
||||||
OnStart: func(context.Context) error {
|
|
||||||
go func() {
|
|
||||||
t := time.NewTicker(10 * time.Second)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-t.C:
|
|
||||||
|
|
||||||
st := gs.Stats()
|
|
||||||
stats.Record(mctx, metrics.GraphsyncReceivingPeersCount.M(int64(st.OutgoingRequests.TotalPeers)))
|
|
||||||
stats.Record(mctx, metrics.GraphsyncReceivingActiveCount.M(int64(st.OutgoingRequests.Active)))
|
|
||||||
stats.Record(mctx, metrics.GraphsyncReceivingCountCount.M(int64(st.OutgoingRequests.Pending)))
|
|
||||||
stats.Record(mctx, metrics.GraphsyncReceivingTotalMemoryAllocated.M(int64(st.IncomingResponses.TotalAllocatedAllPeers)))
|
|
||||||
stats.Record(mctx, metrics.GraphsyncReceivingTotalPendingAllocations.M(int64(st.IncomingResponses.TotalPendingAllocations)))
|
|
||||||
stats.Record(mctx, metrics.GraphsyncReceivingPeersPending.M(int64(st.IncomingResponses.NumPeersWithPendingAllocations)))
|
|
||||||
stats.Record(mctx, metrics.GraphsyncSendingPeersCount.M(int64(st.IncomingRequests.TotalPeers)))
|
|
||||||
stats.Record(mctx, metrics.GraphsyncSendingActiveCount.M(int64(st.IncomingRequests.Active)))
|
|
||||||
stats.Record(mctx, metrics.GraphsyncSendingCountCount.M(int64(st.IncomingRequests.Pending)))
|
|
||||||
stats.Record(mctx, metrics.GraphsyncSendingTotalMemoryAllocated.M(int64(st.OutgoingResponses.TotalAllocatedAllPeers)))
|
|
||||||
stats.Record(mctx, metrics.GraphsyncSendingTotalPendingAllocations.M(int64(st.OutgoingResponses.TotalPendingAllocations)))
|
|
||||||
stats.Record(mctx, metrics.GraphsyncSendingPeersPending.M(int64(st.OutgoingResponses.NumPeersWithPendingAllocations)))
|
|
||||||
|
|
||||||
case <-stopStats:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
OnStop: func(ctx context.Context) error {
|
|
||||||
close(stopStats)
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
return gs, nil
|
return gs, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func graphsyncStats(mctx helpers.MetricsCtx, lc fx.Lifecycle, gs dtypes.Graphsync) {
|
||||||
|
stopStats := make(chan struct{})
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStart: func(context.Context) error {
|
||||||
|
go func() {
|
||||||
|
t := time.NewTicker(10 * time.Second)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
|
||||||
|
st := gs.Stats()
|
||||||
|
stats.Record(mctx, metrics.GraphsyncReceivingPeersCount.M(int64(st.OutgoingRequests.TotalPeers)))
|
||||||
|
stats.Record(mctx, metrics.GraphsyncReceivingActiveCount.M(int64(st.OutgoingRequests.Active)))
|
||||||
|
stats.Record(mctx, metrics.GraphsyncReceivingCountCount.M(int64(st.OutgoingRequests.Pending)))
|
||||||
|
stats.Record(mctx, metrics.GraphsyncReceivingTotalMemoryAllocated.M(int64(st.IncomingResponses.TotalAllocatedAllPeers)))
|
||||||
|
stats.Record(mctx, metrics.GraphsyncReceivingTotalPendingAllocations.M(int64(st.IncomingResponses.TotalPendingAllocations)))
|
||||||
|
stats.Record(mctx, metrics.GraphsyncReceivingPeersPending.M(int64(st.IncomingResponses.NumPeersWithPendingAllocations)))
|
||||||
|
stats.Record(mctx, metrics.GraphsyncSendingPeersCount.M(int64(st.IncomingRequests.TotalPeers)))
|
||||||
|
stats.Record(mctx, metrics.GraphsyncSendingActiveCount.M(int64(st.IncomingRequests.Active)))
|
||||||
|
stats.Record(mctx, metrics.GraphsyncSendingCountCount.M(int64(st.IncomingRequests.Pending)))
|
||||||
|
stats.Record(mctx, metrics.GraphsyncSendingTotalMemoryAllocated.M(int64(st.OutgoingResponses.TotalAllocatedAllPeers)))
|
||||||
|
stats.Record(mctx, metrics.GraphsyncSendingTotalPendingAllocations.M(int64(st.OutgoingResponses.TotalPendingAllocations)))
|
||||||
|
stats.Record(mctx, metrics.GraphsyncSendingPeersPending.M(int64(st.OutgoingResponses.NumPeersWithPendingAllocations)))
|
||||||
|
|
||||||
|
case <-stopStats:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
OnStop: func(ctx context.Context) error {
|
||||||
|
close(stopStats)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -408,6 +408,8 @@ func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRe
|
|||||||
graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks),
|
graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks),
|
||||||
graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks))
|
graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks))
|
||||||
|
|
||||||
|
graphsyncStats(mctx, lc, gs)
|
||||||
|
|
||||||
return gs
|
return gs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user