Add metrics to dagstore piecereader
This commit is contained in:
parent
9d143426ee
commit
dad9190142
@ -2,6 +2,8 @@ package dagstore
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
|
"go.opencensus.io/stats"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -29,6 +31,8 @@ type pieceReader struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *pieceReader) init() (_ *pieceReader, err error) {
|
func (p *pieceReader) init() (_ *pieceReader, err error) {
|
||||||
|
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))
|
||||||
|
|
||||||
p.rAt = 0
|
p.rAt = 0
|
||||||
p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
|
p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -95,6 +99,8 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
|||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stats.Record(p.ctx, metrics.DagStorePRBytesRequested.M(int64(len(b))))
|
||||||
|
|
||||||
// 1. Get the backing reader into the correct position
|
// 1. Get the backing reader into the correct position
|
||||||
|
|
||||||
// if the backing reader is ahead of the offset we want, or more than
|
// if the backing reader is ahead of the offset we want, or more than
|
||||||
@ -109,6 +115,12 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
|||||||
|
|
||||||
log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt)
|
log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt)
|
||||||
|
|
||||||
|
if off > p.rAt {
|
||||||
|
stats.Record(p.ctx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1))
|
||||||
|
} else {
|
||||||
|
stats.Record(p.ctx, metrics.DagStorePRSeekBackBytes.M(p.rAt-off), metrics.DagStorePRSeekBackCount.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
p.rAt = off
|
p.rAt = off
|
||||||
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
|
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -118,6 +130,8 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
|||||||
|
|
||||||
// 2. Check if we need to burn some bytes
|
// 2. Check if we need to burn some bytes
|
||||||
if off > p.rAt {
|
if off > p.rAt {
|
||||||
|
stats.Record(p.ctx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1))
|
||||||
|
|
||||||
n, err := io.CopyN(io.Discard, p.r, off-p.rAt)
|
n, err := io.CopyN(io.Discard, p.r, off-p.rAt)
|
||||||
p.rAt += n
|
p.rAt += n
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -128,6 +128,15 @@ var (
|
|||||||
StorageLimitUsedBytes = stats.Int64("storage/path_limit_used_bytes", "used optional storage limit bytes", stats.UnitBytes)
|
StorageLimitUsedBytes = stats.Int64("storage/path_limit_used_bytes", "used optional storage limit bytes", stats.UnitBytes)
|
||||||
StorageLimitMaxBytes = stats.Int64("storage/path_limit_max_bytes", "optional storage limit", stats.UnitBytes)
|
StorageLimitMaxBytes = stats.Int64("storage/path_limit_max_bytes", "optional storage limit", stats.UnitBytes)
|
||||||
|
|
||||||
|
DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless)
|
||||||
|
DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes)
|
||||||
|
DagStorePRBytesDiscarded = stats.Int64("dagstore/pr_discarded_bytes", "PieceReader discarded bytes", stats.UnitBytes)
|
||||||
|
DagStorePRDiscardCount = stats.Int64("dagstore/pr_discard_count", "PieceReader discard count", stats.UnitDimensionless)
|
||||||
|
DagStorePRSeekBackCount = stats.Int64("dagstore/pr_seek_back_count", "PieceReader seek back count", stats.UnitDimensionless)
|
||||||
|
DagStorePRSeekForwardCount = stats.Int64("dagstore/pr_seek_forward_count", "PieceReader seek forward count", stats.UnitDimensionless)
|
||||||
|
DagStorePRSeekBackBytes = stats.Int64("dagstore/pr_seek_back_bytes", "PieceReader seek back bytes", stats.UnitBytes)
|
||||||
|
DagStorePRSeekForwardBytes = stats.Int64("dagstore/pr_seek_forward_bytes", "PieceReader seek forward bytes", stats.UnitBytes)
|
||||||
|
|
||||||
// splitstore
|
// splitstore
|
||||||
SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless)
|
SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless)
|
||||||
SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds)
|
SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds)
|
||||||
@ -383,6 +392,39 @@ var (
|
|||||||
TagKeys: []tag.Key{StorageID},
|
TagKeys: []tag.Key{StorageID},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DagStorePRInitCountView = &view.View{
|
||||||
|
Measure: DagStorePRInitCount,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
}
|
||||||
|
DagStorePRBytesRequestedView = &view.View{
|
||||||
|
Measure: DagStorePRBytesRequested,
|
||||||
|
Aggregation: view.Sum(),
|
||||||
|
}
|
||||||
|
DagStorePRBytesDiscardedView = &view.View{
|
||||||
|
Measure: DagStorePRBytesDiscarded,
|
||||||
|
Aggregation: view.Sum(),
|
||||||
|
}
|
||||||
|
DagStorePRDiscardCountView = &view.View{
|
||||||
|
Measure: DagStorePRDiscardCount,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
}
|
||||||
|
DagStorePRSeekBackCountView = &view.View{
|
||||||
|
Measure: DagStorePRSeekBackCount,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
}
|
||||||
|
DagStorePRSeekForwardCountView = &view.View{
|
||||||
|
Measure: DagStorePRSeekForwardCount,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
}
|
||||||
|
DagStorePRSeekBackBytesView = &view.View{
|
||||||
|
Measure: DagStorePRSeekBackBytes,
|
||||||
|
Aggregation: view.Sum(),
|
||||||
|
}
|
||||||
|
DagStorePRSeekForwardBytesView = &view.View{
|
||||||
|
Measure: DagStorePRSeekForwardBytes,
|
||||||
|
Aggregation: view.Sum(),
|
||||||
|
}
|
||||||
|
|
||||||
// splitstore
|
// splitstore
|
||||||
SplitstoreMissView = &view.View{
|
SplitstoreMissView = &view.View{
|
||||||
Measure: SplitstoreMiss,
|
Measure: SplitstoreMiss,
|
||||||
@ -539,6 +581,14 @@ var MinerNodeViews = append([]*view.View{
|
|||||||
StorageReservedBytesView,
|
StorageReservedBytesView,
|
||||||
StorageLimitUsedBytesView,
|
StorageLimitUsedBytesView,
|
||||||
StorageLimitMaxBytesView,
|
StorageLimitMaxBytesView,
|
||||||
|
DagStorePRInitCountView,
|
||||||
|
DagStorePRBytesRequestedView,
|
||||||
|
DagStorePRBytesDiscardedView,
|
||||||
|
DagStorePRDiscardCountView,
|
||||||
|
DagStorePRSeekBackCountView,
|
||||||
|
DagStorePRSeekForwardCountView,
|
||||||
|
DagStorePRSeekBackBytesView,
|
||||||
|
DagStorePRSeekForwardBytesView,
|
||||||
}, DefaultViews...)
|
}, DefaultViews...)
|
||||||
|
|
||||||
// SinceInMilliseconds returns the duration of time since the provide time as a float64.
|
// SinceInMilliseconds returns the duration of time since the provide time as a float64.
|
||||||
|
Loading…
Reference in New Issue
Block a user