Add metrics to dagstore piecereader
This commit is contained in:
parent
fbd3c90b80
commit
9725d72a80
@ -2,6 +2,8 @@ package dagstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/filecoin-project/lotus/metrics"
|
||||
"go.opencensus.io/stats"
|
||||
"io"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
@ -29,6 +31,8 @@ type pieceReader struct {
|
||||
}
|
||||
|
||||
func (p *pieceReader) init() (_ *pieceReader, err error) {
|
||||
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))
|
||||
|
||||
p.rAt = 0
|
||||
p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
|
||||
if err != nil {
|
||||
@ -95,6 +99,8 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
stats.Record(p.ctx, metrics.DagStorePRBytesRequested.M(int64(len(b))))
|
||||
|
||||
// 1. Get the backing reader into the correct position
|
||||
|
||||
// if the backing reader is ahead of the offset we want, or more than
|
||||
@ -109,6 +115,12 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
|
||||
log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt)
|
||||
|
||||
if off > p.rAt {
|
||||
stats.Record(p.ctx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1))
|
||||
} else {
|
||||
stats.Record(p.ctx, metrics.DagStorePRSeekBackBytes.M(p.rAt-off), metrics.DagStorePRSeekBackCount.M(1))
|
||||
}
|
||||
|
||||
p.rAt = off
|
||||
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
|
||||
if err != nil {
|
||||
@ -118,6 +130,8 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
|
||||
// 2. Check if we need to burn some bytes
|
||||
if off > p.rAt {
|
||||
stats.Record(p.ctx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1))
|
||||
|
||||
n, err := io.CopyN(io.Discard, p.r, off-p.rAt)
|
||||
p.rAt += n
|
||||
if err != nil {
|
||||
|
@ -128,6 +128,15 @@ var (
|
||||
StorageLimitUsedBytes = stats.Int64("storage/path_limit_used_bytes", "used optional storage limit bytes", stats.UnitBytes)
|
||||
StorageLimitMaxBytes = stats.Int64("storage/path_limit_max_bytes", "optional storage limit", stats.UnitBytes)
|
||||
|
||||
DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless)
|
||||
DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes)
|
||||
DagStorePRBytesDiscarded = stats.Int64("dagstore/pr_discarded_bytes", "PieceReader discarded bytes", stats.UnitBytes)
|
||||
DagStorePRDiscardCount = stats.Int64("dagstore/pr_discard_count", "PieceReader discard count", stats.UnitDimensionless)
|
||||
DagStorePRSeekBackCount = stats.Int64("dagstore/pr_seek_back_count", "PieceReader seek back count", stats.UnitDimensionless)
|
||||
DagStorePRSeekForwardCount = stats.Int64("dagstore/pr_seek_forward_count", "PieceReader seek forward count", stats.UnitDimensionless)
|
||||
DagStorePRSeekBackBytes = stats.Int64("dagstore/pr_seek_back_bytes", "PieceReader seek back bytes", stats.UnitBytes)
|
||||
DagStorePRSeekForwardBytes = stats.Int64("dagstore/pr_seek_forward_bytes", "PieceReader seek forward bytes", stats.UnitBytes)
|
||||
|
||||
// splitstore
|
||||
SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless)
|
||||
SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds)
|
||||
@ -383,6 +392,39 @@ var (
|
||||
TagKeys: []tag.Key{StorageID},
|
||||
}
|
||||
|
||||
DagStorePRInitCountView = &view.View{
|
||||
Measure: DagStorePRInitCount,
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
DagStorePRBytesRequestedView = &view.View{
|
||||
Measure: DagStorePRBytesRequested,
|
||||
Aggregation: view.Sum(),
|
||||
}
|
||||
DagStorePRBytesDiscardedView = &view.View{
|
||||
Measure: DagStorePRBytesDiscarded,
|
||||
Aggregation: view.Sum(),
|
||||
}
|
||||
DagStorePRDiscardCountView = &view.View{
|
||||
Measure: DagStorePRDiscardCount,
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
DagStorePRSeekBackCountView = &view.View{
|
||||
Measure: DagStorePRSeekBackCount,
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
DagStorePRSeekForwardCountView = &view.View{
|
||||
Measure: DagStorePRSeekForwardCount,
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
DagStorePRSeekBackBytesView = &view.View{
|
||||
Measure: DagStorePRSeekBackBytes,
|
||||
Aggregation: view.Sum(),
|
||||
}
|
||||
DagStorePRSeekForwardBytesView = &view.View{
|
||||
Measure: DagStorePRSeekForwardBytes,
|
||||
Aggregation: view.Sum(),
|
||||
}
|
||||
|
||||
// splitstore
|
||||
SplitstoreMissView = &view.View{
|
||||
Measure: SplitstoreMiss,
|
||||
@ -539,6 +581,14 @@ var MinerNodeViews = append([]*view.View{
|
||||
StorageReservedBytesView,
|
||||
StorageLimitUsedBytesView,
|
||||
StorageLimitMaxBytesView,
|
||||
DagStorePRInitCountView,
|
||||
DagStorePRBytesRequestedView,
|
||||
DagStorePRBytesDiscardedView,
|
||||
DagStorePRDiscardCountView,
|
||||
DagStorePRSeekBackCountView,
|
||||
DagStorePRSeekForwardCountView,
|
||||
DagStorePRSeekBackBytesView,
|
||||
DagStorePRSeekForwardBytesView,
|
||||
}, DefaultViews...)
|
||||
|
||||
// SinceInMilliseconds returns the duration of time since the provide time as a float64.
|
||||
|
Loading…
Reference in New Issue
Block a user