feat: sched: Add metrics around sched cycle

This commit is contained in:
Łukasz Magiera 2022-11-28 18:56:28 +01:00
parent 8fd3875069
commit bc879c5bd8
6 changed files with 55 additions and 7 deletions

View File

@ -14,7 +14,7 @@ import (
)
// Distribution
var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 3000, 4000, 5000, 7500, 10000, 20000, 50000, 100000)
var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 3000, 4000, 5000, 7500, 10000, 20000, 50000, 100_000, 250_000, 500_000, 1000_000)
var workMillisecondsDistribution = view.Distribution(
250, 500, 1000, 2000, 5000, 10_000, 30_000, 60_000, 2*60_000, 5*60_000, 10*60_000, 15*60_000, 30*60_000, // short sealing tasks
40*60_000, 45*60_000, 50*60_000, 55*60_000, 60*60_000, 65*60_000, 70*60_000, 75*60_000, 80*60_000, 85*60_000, 100*60_000, 120*60_000, // PC2 / C2 range
@ -136,6 +136,11 @@ var (
StorageLimitUsedBytes = stats.Int64("storage/path_limit_used_bytes", "used optional storage limit bytes", stats.UnitBytes)
StorageLimitMaxBytes = stats.Int64("storage/path_limit_max_bytes", "optional storage limit", stats.UnitBytes)
SchedAssignerCycleDuration = stats.Float64("sched/assigner_cycle_ms", "Duration of scheduler assigner cycle", stats.UnitMilliseconds)
SchedAssignerCandidatesDuration = stats.Float64("sched/assigner_cycle_candidates_ms", "Duration of scheduler assigner candidate matching step", stats.UnitMilliseconds)
SchedAssignerWindowSelectionDuration = stats.Float64("sched/assigner_cycle_window_select_ms", "Duration of scheduler window selection step", stats.UnitMilliseconds)
SchedAssignerSubmitDuration = stats.Float64("sched/assigner_cycle_submit_ms", "Duration of scheduler window submit step", stats.UnitMilliseconds)
DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless)
DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes)
DagStorePRBytesDiscarded = stats.Int64("dagstore/pr_discarded_bytes", "PieceReader discarded bytes", stats.UnitBytes)
@ -428,6 +433,23 @@ var (
TagKeys: []tag.Key{StorageID},
}
SchedAssignerCycleDurationView = &view.View{
Measure: SchedAssignerCycleDuration,
Aggregation: defaultMillisecondsDistribution,
}
SchedAssignerCandidatesDurationView = &view.View{
Measure: SchedAssignerCandidatesDuration,
Aggregation: defaultMillisecondsDistribution,
}
SchedAssignerWindowSelectionDurationView = &view.View{
Measure: SchedAssignerWindowSelectionDuration,
Aggregation: defaultMillisecondsDistribution,
}
SchedAssignerSubmitDurationView = &view.View{
Measure: SchedAssignerSubmitDuration,
Aggregation: defaultMillisecondsDistribution,
}
DagStorePRInitCountView = &view.View{
Measure: DagStorePRInitCount,
Aggregation: view.Count(),
@ -697,6 +719,7 @@ var MinerNodeViews = append([]*view.View{
WorkerCallsReturnedCountView,
WorkerUntrackedCallsReturnedView,
WorkerCallsReturnedDurationView,
SectorStatesView,
StorageFSAvailableView,
StorageAvailableView,
@ -708,6 +731,12 @@ var MinerNodeViews = append([]*view.View{
StorageReservedBytesView,
StorageLimitUsedBytesView,
StorageLimitMaxBytesView,
SchedAssignerCycleDurationView,
SchedAssignerCandidatesDurationView,
SchedAssignerWindowSelectionDurationView,
SchedAssignerSubmitDurationView,
DagStorePRInitCountView,
DagStorePRBytesRequestedView,
DagStorePRBytesDiscardedView,

View File

@ -105,7 +105,7 @@ func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.Loc
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
sh, err := newScheduler(sc.Assigner)
sh, err := newScheduler(ctx, sc.Assigner)
if err != nil {
return nil, err
}

View File

@ -110,7 +110,7 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man
stor := paths.NewRemote(lstor, si, nil, 6000, &paths.DefaultPartialFileHandler{})
sh, err := newScheduler("")
sh, err := newScheduler(ctx, "")
require.NoError(t, err)
m := &Manager{

View File

@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -51,6 +52,8 @@ type WorkerSelector interface {
}
type Scheduler struct {
mctx context.Context // metrics context
assigner Assigner
workersLk sync.RWMutex
@ -146,7 +149,7 @@ type rmRequest struct {
res chan error
}
func newScheduler(assigner string) (*Scheduler, error) {
func newScheduler(ctx context.Context, assigner string) (*Scheduler, error) {
var a Assigner
switch assigner {
case "", "utilization":
@ -158,6 +161,7 @@ func newScheduler(assigner string) (*Scheduler, error) {
}
return &Scheduler{
mctx: ctx,
assigner: a,
Workers: map[storiface.WorkerID]*WorkerHandle{},
@ -366,6 +370,9 @@ func (sh *Scheduler) trySched() {
sh.workersLk.RLock()
defer sh.workersLk.RUnlock()
done := metrics.Timer(sh.mctx, metrics.SchedAssignerCycleDuration)
defer done()
sh.assigner.TrySched(sh)
}

View File

@ -5,6 +5,8 @@ import (
"math/rand"
"sort"
"sync"
"github.com/filecoin-project/lotus/metrics"
)
type WindowSelector func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int
@ -52,6 +54,11 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
// Step 1
throttle := make(chan struct{}, windowsLen)
partDone := metrics.Timer(sh.mctx, metrics.SchedAssignerCandidatesDuration)
defer func() {
partDone()
}()
var wg sync.WaitGroup
wg.Add(queueLen)
for i := 0; i < queueLen; i++ {
@ -152,9 +159,14 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
log.Debugf("SCHED Acceptable win: %+v", acceptableWindows)
// Step 2
partDone()
partDone = metrics.Timer(sh.mctx, metrics.SchedAssignerWindowSelectionDuration)
scheduled := a.WindowSel(sh, queueLen, acceptableWindows, windows)
// Step 3
partDone()
partDone = metrics.Timer(sh.mctx, metrics.SchedAssignerSubmitDuration)
if scheduled == 0 {
return

View File

@ -226,7 +226,7 @@ func addTestWorker(t *testing.T, sched *Scheduler, index *paths.Index, name stri
}
func TestSchedStartStop(t *testing.T) {
sched, err := newScheduler("")
sched, err := newScheduler(context.Background(), "")
require.NoError(t, err)
go sched.runSched()
@ -356,7 +356,7 @@ func TestSched(t *testing.T) {
return func(t *testing.T) {
index := paths.NewIndex(nil)
sched, err := newScheduler("")
sched, err := newScheduler(ctx, "")
require.NoError(t, err)
sched.testSync = make(chan struct{})
@ -609,7 +609,7 @@ func BenchmarkTrySched(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
sched, err := newScheduler("")
sched, err := newScheduler(ctx, "")
require.NoError(b, err)
sched.Workers[storiface.WorkerID{}] = &WorkerHandle{
workerRpc: nil,