diff --git a/markets/dagstore/piecereader.go b/markets/dagstore/piecereader.go index 6ef69dfbe..f9ba881f5 100644 --- a/markets/dagstore/piecereader.go +++ b/markets/dagstore/piecereader.go @@ -2,6 +2,8 @@ package dagstore import ( "context" + "github.com/filecoin-project/lotus/metrics" + "go.opencensus.io/stats" "io" "github.com/ipfs/go-cid" @@ -29,6 +31,8 @@ type pieceReader struct { } func (p *pieceReader) init() (_ *pieceReader, err error) { + stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1)) + p.rAt = 0 p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) if err != nil { @@ -95,6 +99,8 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { return 0, err } + stats.Record(p.ctx, metrics.DagStorePRBytesRequested.M(int64(len(b)))) + // 1. Get the backing reader into the correct position // if the backing reader is ahead of the offset we want, or more than @@ -109,6 +115,12 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt) + if off > p.rAt { + stats.Record(p.ctx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1)) + } else { + stats.Record(p.ctx, metrics.DagStorePRSeekBackBytes.M(p.rAt-off), metrics.DagStorePRSeekBackCount.M(1)) + } + p.rAt = off p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) if err != nil { @@ -118,6 +130,8 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { // 2. Check if we need to burn some bytes if off > p.rAt { + stats.Record(p.ctx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1)) + n, err := io.CopyN(io.Discard, p.r, off-p.rAt) p.rAt += n if err != nil { diff --git a/metrics/metrics.go b/metrics/metrics.go index ddd149d8d..b4032bb1d 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -128,6 +128,15 @@ var ( StorageLimitUsedBytes = stats.Int64("storage/path_limit_used_bytes", "used optional storage limit bytes", stats.UnitBytes) StorageLimitMaxBytes = stats.Int64("storage/path_limit_max_bytes", "optional storage limit", stats.UnitBytes) + DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless) + DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes) + DagStorePRBytesDiscarded = stats.Int64("dagstore/pr_discarded_bytes", "PieceReader discarded bytes", stats.UnitBytes) + DagStorePRDiscardCount = stats.Int64("dagstore/pr_discard_count", "PieceReader discard count", stats.UnitDimensionless) + DagStorePRSeekBackCount = stats.Int64("dagstore/pr_seek_back_count", "PieceReader seek back count", stats.UnitDimensionless) + DagStorePRSeekForwardCount = stats.Int64("dagstore/pr_seek_forward_count", "PieceReader seek forward count", stats.UnitDimensionless) + DagStorePRSeekBackBytes = stats.Int64("dagstore/pr_seek_back_bytes", "PieceReader seek back bytes", stats.UnitBytes) + DagStorePRSeekForwardBytes = stats.Int64("dagstore/pr_seek_forward_bytes", "PieceReader seek forward bytes", stats.UnitBytes) + // splitstore SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless) SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds) @@ -383,6 +392,39 @@ var ( TagKeys: []tag.Key{StorageID}, } + DagStorePRInitCountView = &view.View{ + Measure: DagStorePRInitCount, + Aggregation: view.Count(), + } + DagStorePRBytesRequestedView = &view.View{ + Measure: DagStorePRBytesRequested, + Aggregation: view.Sum(), + } + DagStorePRBytesDiscardedView = &view.View{ + Measure: DagStorePRBytesDiscarded, + Aggregation: view.Sum(), + } + DagStorePRDiscardCountView = &view.View{ + Measure: DagStorePRDiscardCount, + Aggregation: view.Count(), + } + DagStorePRSeekBackCountView = &view.View{ + Measure: DagStorePRSeekBackCount, + Aggregation: view.Count(), + } + DagStorePRSeekForwardCountView = &view.View{ + Measure: DagStorePRSeekForwardCount, + Aggregation: view.Count(), + } + DagStorePRSeekBackBytesView = &view.View{ + Measure: DagStorePRSeekBackBytes, + Aggregation: view.Sum(), + } + DagStorePRSeekForwardBytesView = &view.View{ + Measure: DagStorePRSeekForwardBytes, + Aggregation: view.Sum(), + } + // splitstore SplitstoreMissView = &view.View{ Measure: SplitstoreMiss, @@ -539,6 +581,14 @@ var MinerNodeViews = append([]*view.View{ StorageReservedBytesView, StorageLimitUsedBytesView, StorageLimitMaxBytesView, + DagStorePRInitCountView, + DagStorePRBytesRequestedView, + DagStorePRBytesDiscardedView, + DagStorePRDiscardCountView, + DagStorePRSeekBackCountView, + DagStorePRSeekForwardCountView, + DagStorePRSeekBackBytesView, + DagStorePRSeekForwardBytesView, }, DefaultViews...) // SinceInMilliseconds returns the duration of time since the provide time as a float64.