diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 00e38694d..10bec7e0b 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -475,7 +475,7 @@ func (m *Sealing) onUpdateSector(ctx context.Context, state *SectorInfo) error { return xerrors.Errorf("getting config: %w", err) } - shouldUpdateInput := m.stats.updateSector(cfg, m.minerSectorID(state.SectorNumber), state.State) + shouldUpdateInput := m.stats.updateSector(ctx, cfg, m.minerSectorID(state.SectorNumber), state.State) // trigger more input processing when we've dipped below max sealing limits if shouldUpdateInput { diff --git a/extern/storage-sealing/fsm_test.go b/extern/storage-sealing/fsm_test.go index 5ddef0d53..10ee17c6b 100644 --- a/extern/storage-sealing/fsm_test.go +++ b/extern/storage-sealing/fsm_test.go @@ -33,7 +33,8 @@ func TestHappyPath(t *testing.T) { s: &Sealing{ maddr: ma, stats: SectorStats{ - bySector: map[abi.SectorID]statSectorState{}, + bySector: map[abi.SectorID]SectorState{}, + byState: map[SectorState]int64{}, }, notifee: func(before, after SectorInfo) { notif = append(notif, struct{ before, after SectorInfo }{before, after}) @@ -94,7 +95,8 @@ func TestHappyPathFinalizeEarly(t *testing.T) { s: &Sealing{ maddr: ma, stats: SectorStats{ - bySector: map[abi.SectorID]statSectorState{}, + bySector: map[abi.SectorID]SectorState{}, + byState: map[SectorState]int64{}, }, notifee: func(before, after SectorInfo) { notif = append(notif, struct{ before, after SectorInfo }{before, after}) @@ -161,7 +163,8 @@ func TestCommitFinalizeFailed(t *testing.T) { s: &Sealing{ maddr: ma, stats: SectorStats{ - bySector: map[abi.SectorID]statSectorState{}, + bySector: map[abi.SectorID]SectorState{}, + byState: map[SectorState]int64{}, }, notifee: func(before, after SectorInfo) { notif = append(notif, struct{ before, after SectorInfo }{before, after}) @@ -199,7 +202,8 @@ func TestSeedRevert(t *testing.T) { s: &Sealing{ maddr: ma, stats: SectorStats{ - bySector: map[abi.SectorID]statSectorState{}, + bySector: map[abi.SectorID]SectorState{}, + byState: map[SectorState]int64{}, }, }, t: t, @@ -252,7 +256,8 @@ func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) { s: &Sealing{ maddr: ma, stats: SectorStats{ - bySector: map[abi.SectorID]statSectorState{}, + bySector: map[abi.SectorID]SectorState{}, + byState: map[SectorState]int64{}, }, }, t: t, @@ -289,7 +294,8 @@ func TestBrokenState(t *testing.T) { s: &Sealing{ maddr: ma, stats: SectorStats{ - bySector: map[abi.SectorID]statSectorState{}, + bySector: map[abi.SectorID]SectorState{}, + byState: map[SectorState]int64{}, }, notifee: func(before, after SectorInfo) { notif = append(notif, struct{ before, after SectorInfo }{before, after}) @@ -324,7 +330,8 @@ func TestTicketExpired(t *testing.T) { s: &Sealing{ maddr: ma, stats: SectorStats{ - bySector: map[abi.SectorID]statSectorState{}, + bySector: map[abi.SectorID]SectorState{}, + byState: map[SectorState]int64{}, }, notifee: func(before, after SectorInfo) { notif = append(notif, struct{ before, after SectorInfo }{before, after}) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 875c33cdf..60c3a79e2 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -473,7 +473,7 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi } // update stats early, fsm planner would do that async - m.stats.updateSector(cfg, m.minerSectorID(sid), UndefinedSectorState) + m.stats.updateSector(ctx, cfg, m.minerSectorID(sid), UndefinedSectorState) return sid, nil } diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 165003bc9..583bed052 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -166,7 +166,8 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events getConfig: gc, stats: SectorStats{ - bySector: map[abi.SectorID]statSectorState{}, + bySector: map[abi.SectorID]SectorState{}, + byState: map[SectorState]int64{}, }, } s.startupWait.Add(1) diff --git a/extern/storage-sealing/stats.go b/extern/storage-sealing/stats.go index 28556866a..12d951ddb 100644 --- a/extern/storage-sealing/stats.go +++ b/extern/storage-sealing/stats.go @@ -1,6 +1,10 @@ package sealing import ( + "context" + "github.com/filecoin-project/lotus/metrics" + "go.opencensus.io/stats" + "go.opencensus.io/tag" "sync" "github.com/filecoin-project/go-state-types/abi" @@ -20,11 +24,12 @@ const ( type SectorStats struct { lk sync.Mutex - bySector map[abi.SectorID]statSectorState + bySector map[abi.SectorID]SectorState + byState map[SectorState]int64 totals [nsst]uint64 } -func (ss *SectorStats) updateSector(cfg sealiface.Config, id abi.SectorID, st SectorState) (updateInput bool) { +func (ss *SectorStats) updateSector(ctx context.Context, cfg sealiface.Config, id abi.SectorID, st SectorState) (updateInput bool) { ss.lk.Lock() defer ss.lk.Unlock() @@ -34,12 +39,20 @@ func (ss *SectorStats) updateSector(cfg sealiface.Config, id abi.SectorID, st Se // update totals oldst, found := ss.bySector[id] if found { - ss.totals[oldst]-- + ss.totals[toStatState(oldst, cfg.FinalizeEarly)]-- + ss.byState[oldst]-- + + mctx, _ := tag.New(ctx, tag.Upsert(metrics.SectorState, string(oldst))) + stats.Record(mctx, metrics.SectorStates.M(ss.byState[oldst])) } sst := toStatState(st, cfg.FinalizeEarly) - ss.bySector[id] = sst + ss.bySector[id] = st ss.totals[sst]++ + ss.byState[st]++ + + mctx, _ := tag.New(ctx, tag.Upsert(metrics.SectorState, string(st))) + stats.Record(mctx, metrics.SectorStates.M(ss.byState[st])) // check if we may need be able to process more deals sealing := ss.curSealingLocked() diff --git a/metrics/metrics.go b/metrics/metrics.go index fd538839d..63c0dede9 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -46,6 +46,7 @@ var ( TaskType, _ = tag.NewKey("task_type") WorkerHostname, _ = tag.NewKey("worker_hostname") StorageID, _ = tag.NewKey("storage_id") + SectorState, _ = tag.NewKey("sector_state") ) // Measures @@ -98,6 +99,8 @@ var ( WorkerCallsReturnedDuration = stats.Float64("sealing/worker_calls_returned_ms", "Counter of returned worker tasks", stats.UnitMilliseconds) WorkerUntrackedCallsReturned = stats.Int64("sealing/worker_untracked_calls_returned", "Counter of returned untracked worker tasks", stats.UnitDimensionless) + SectorStates = stats.Int64("sealing/states", "Number of sectors in each state", stats.UnitDimensionless) + StorageFSAvailable = stats.Float64("storage/path_fs_available_frac", "Fraction of filesystem available storage", stats.UnitDimensionless) StorageAvailable = stats.Float64("storage/path_available_frac", "Fraction of available storage", stats.UnitDimensionless) StorageReserved = stats.Float64("storage/path_reserved_frac", "Fraction of reserved storage", stats.UnitDimensionless) @@ -308,6 +311,11 @@ var ( Aggregation: workMillisecondsDistribution, TagKeys: []tag.Key{TaskType, WorkerHostname}, } + SectorStatesView = &view.View{ + Measure: SectorStates, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{SectorState}, + } StorageFSAvailableView = &view.View{ Measure: StorageFSAvailable, Aggregation: view.LastValue(), @@ -441,14 +449,17 @@ var MinerNodeViews = append([]*view.View{ WorkerCallsReturnedCountView, WorkerUntrackedCallsReturnedView, WorkerCallsReturnedDurationView, + SectorStatesView, StorageFSAvailableView, StorageAvailableView, StorageReservedView, StorageLimitUsedView, + StorageCapacityBytesView, StorageFSAvailableBytesView, StorageAvailableBytesView, StorageReservedBytesView, StorageLimitUsedBytesView, + StorageLimitMaxBytesView, }, DefaultViews...) // SinceInMilliseconds returns the duration of time since the provide time as a float64.