Merge pull request #7541 from filecoin-project/feat/sector-state-prom
Expose per-state sector counts on the prometheus endpoint
This commit is contained in:
commit
a60ccbf106
2
extern/storage-sealing/fsm.go
vendored
2
extern/storage-sealing/fsm.go
vendored
@ -475,7 +475,7 @@ func (m *Sealing) onUpdateSector(ctx context.Context, state *SectorInfo) error {
|
|||||||
return xerrors.Errorf("getting config: %w", err)
|
return xerrors.Errorf("getting config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
shouldUpdateInput := m.stats.updateSector(cfg, m.minerSectorID(state.SectorNumber), state.State)
|
shouldUpdateInput := m.stats.updateSector(ctx, cfg, m.minerSectorID(state.SectorNumber), state.State)
|
||||||
|
|
||||||
// trigger more input processing when we've dipped below max sealing limits
|
// trigger more input processing when we've dipped below max sealing limits
|
||||||
if shouldUpdateInput {
|
if shouldUpdateInput {
|
||||||
|
21
extern/storage-sealing/fsm_test.go
vendored
21
extern/storage-sealing/fsm_test.go
vendored
@ -33,7 +33,8 @@ func TestHappyPath(t *testing.T) {
|
|||||||
s: &Sealing{
|
s: &Sealing{
|
||||||
maddr: ma,
|
maddr: ma,
|
||||||
stats: SectorStats{
|
stats: SectorStats{
|
||||||
bySector: map[abi.SectorID]statSectorState{},
|
bySector: map[abi.SectorID]SectorState{},
|
||||||
|
byState: map[SectorState]int64{},
|
||||||
},
|
},
|
||||||
notifee: func(before, after SectorInfo) {
|
notifee: func(before, after SectorInfo) {
|
||||||
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
||||||
@ -94,7 +95,8 @@ func TestHappyPathFinalizeEarly(t *testing.T) {
|
|||||||
s: &Sealing{
|
s: &Sealing{
|
||||||
maddr: ma,
|
maddr: ma,
|
||||||
stats: SectorStats{
|
stats: SectorStats{
|
||||||
bySector: map[abi.SectorID]statSectorState{},
|
bySector: map[abi.SectorID]SectorState{},
|
||||||
|
byState: map[SectorState]int64{},
|
||||||
},
|
},
|
||||||
notifee: func(before, after SectorInfo) {
|
notifee: func(before, after SectorInfo) {
|
||||||
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
||||||
@ -161,7 +163,8 @@ func TestCommitFinalizeFailed(t *testing.T) {
|
|||||||
s: &Sealing{
|
s: &Sealing{
|
||||||
maddr: ma,
|
maddr: ma,
|
||||||
stats: SectorStats{
|
stats: SectorStats{
|
||||||
bySector: map[abi.SectorID]statSectorState{},
|
bySector: map[abi.SectorID]SectorState{},
|
||||||
|
byState: map[SectorState]int64{},
|
||||||
},
|
},
|
||||||
notifee: func(before, after SectorInfo) {
|
notifee: func(before, after SectorInfo) {
|
||||||
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
||||||
@ -199,7 +202,8 @@ func TestSeedRevert(t *testing.T) {
|
|||||||
s: &Sealing{
|
s: &Sealing{
|
||||||
maddr: ma,
|
maddr: ma,
|
||||||
stats: SectorStats{
|
stats: SectorStats{
|
||||||
bySector: map[abi.SectorID]statSectorState{},
|
bySector: map[abi.SectorID]SectorState{},
|
||||||
|
byState: map[SectorState]int64{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
t: t,
|
t: t,
|
||||||
@ -252,7 +256,8 @@ func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) {
|
|||||||
s: &Sealing{
|
s: &Sealing{
|
||||||
maddr: ma,
|
maddr: ma,
|
||||||
stats: SectorStats{
|
stats: SectorStats{
|
||||||
bySector: map[abi.SectorID]statSectorState{},
|
bySector: map[abi.SectorID]SectorState{},
|
||||||
|
byState: map[SectorState]int64{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
t: t,
|
t: t,
|
||||||
@ -289,7 +294,8 @@ func TestBrokenState(t *testing.T) {
|
|||||||
s: &Sealing{
|
s: &Sealing{
|
||||||
maddr: ma,
|
maddr: ma,
|
||||||
stats: SectorStats{
|
stats: SectorStats{
|
||||||
bySector: map[abi.SectorID]statSectorState{},
|
bySector: map[abi.SectorID]SectorState{},
|
||||||
|
byState: map[SectorState]int64{},
|
||||||
},
|
},
|
||||||
notifee: func(before, after SectorInfo) {
|
notifee: func(before, after SectorInfo) {
|
||||||
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
||||||
@ -324,7 +330,8 @@ func TestTicketExpired(t *testing.T) {
|
|||||||
s: &Sealing{
|
s: &Sealing{
|
||||||
maddr: ma,
|
maddr: ma,
|
||||||
stats: SectorStats{
|
stats: SectorStats{
|
||||||
bySector: map[abi.SectorID]statSectorState{},
|
bySector: map[abi.SectorID]SectorState{},
|
||||||
|
byState: map[SectorState]int64{},
|
||||||
},
|
},
|
||||||
notifee: func(before, after SectorInfo) {
|
notifee: func(before, after SectorInfo) {
|
||||||
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
||||||
|
2
extern/storage-sealing/input.go
vendored
2
extern/storage-sealing/input.go
vendored
@ -473,7 +473,7 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi
|
|||||||
}
|
}
|
||||||
|
|
||||||
// update stats early, fsm planner would do that async
|
// update stats early, fsm planner would do that async
|
||||||
m.stats.updateSector(cfg, m.minerSectorID(sid), UndefinedSectorState)
|
m.stats.updateSector(ctx, cfg, m.minerSectorID(sid), UndefinedSectorState)
|
||||||
|
|
||||||
return sid, nil
|
return sid, nil
|
||||||
}
|
}
|
||||||
|
3
extern/storage-sealing/sealing.go
vendored
3
extern/storage-sealing/sealing.go
vendored
@ -166,7 +166,8 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events
|
|||||||
getConfig: gc,
|
getConfig: gc,
|
||||||
|
|
||||||
stats: SectorStats{
|
stats: SectorStats{
|
||||||
bySector: map[abi.SectorID]statSectorState{},
|
bySector: map[abi.SectorID]SectorState{},
|
||||||
|
byState: map[SectorState]int64{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
s.startupWait.Add(1)
|
s.startupWait.Add(1)
|
||||||
|
23
extern/storage-sealing/stats.go
vendored
23
extern/storage-sealing/stats.go
vendored
@ -1,10 +1,16 @@
|
|||||||
package sealing
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"go.opencensus.io/stats"
|
||||||
|
"go.opencensus.io/tag"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
type statSectorState int
|
type statSectorState int
|
||||||
@ -20,11 +26,12 @@ const (
|
|||||||
type SectorStats struct {
|
type SectorStats struct {
|
||||||
lk sync.Mutex
|
lk sync.Mutex
|
||||||
|
|
||||||
bySector map[abi.SectorID]statSectorState
|
bySector map[abi.SectorID]SectorState
|
||||||
|
byState map[SectorState]int64
|
||||||
totals [nsst]uint64
|
totals [nsst]uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SectorStats) updateSector(cfg sealiface.Config, id abi.SectorID, st SectorState) (updateInput bool) {
|
func (ss *SectorStats) updateSector(ctx context.Context, cfg sealiface.Config, id abi.SectorID, st SectorState) (updateInput bool) {
|
||||||
ss.lk.Lock()
|
ss.lk.Lock()
|
||||||
defer ss.lk.Unlock()
|
defer ss.lk.Unlock()
|
||||||
|
|
||||||
@ -34,12 +41,20 @@ func (ss *SectorStats) updateSector(cfg sealiface.Config, id abi.SectorID, st Se
|
|||||||
// update totals
|
// update totals
|
||||||
oldst, found := ss.bySector[id]
|
oldst, found := ss.bySector[id]
|
||||||
if found {
|
if found {
|
||||||
ss.totals[oldst]--
|
ss.totals[toStatState(oldst, cfg.FinalizeEarly)]--
|
||||||
|
ss.byState[oldst]--
|
||||||
|
|
||||||
|
mctx, _ := tag.New(ctx, tag.Upsert(metrics.SectorState, string(oldst)))
|
||||||
|
stats.Record(mctx, metrics.SectorStates.M(ss.byState[oldst]))
|
||||||
}
|
}
|
||||||
|
|
||||||
sst := toStatState(st, cfg.FinalizeEarly)
|
sst := toStatState(st, cfg.FinalizeEarly)
|
||||||
ss.bySector[id] = sst
|
ss.bySector[id] = st
|
||||||
ss.totals[sst]++
|
ss.totals[sst]++
|
||||||
|
ss.byState[st]++
|
||||||
|
|
||||||
|
mctx, _ := tag.New(ctx, tag.Upsert(metrics.SectorState, string(st)))
|
||||||
|
stats.Record(mctx, metrics.SectorStates.M(ss.byState[st]))
|
||||||
|
|
||||||
// check if we may need be able to process more deals
|
// check if we may need be able to process more deals
|
||||||
sealing := ss.curSealingLocked()
|
sealing := ss.curSealingLocked()
|
||||||
|
@ -46,6 +46,7 @@ var (
|
|||||||
TaskType, _ = tag.NewKey("task_type")
|
TaskType, _ = tag.NewKey("task_type")
|
||||||
WorkerHostname, _ = tag.NewKey("worker_hostname")
|
WorkerHostname, _ = tag.NewKey("worker_hostname")
|
||||||
StorageID, _ = tag.NewKey("storage_id")
|
StorageID, _ = tag.NewKey("storage_id")
|
||||||
|
SectorState, _ = tag.NewKey("sector_state")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Measures
|
// Measures
|
||||||
@ -114,6 +115,8 @@ var (
|
|||||||
WorkerCallsReturnedDuration = stats.Float64("sealing/worker_calls_returned_ms", "Counter of returned worker tasks", stats.UnitMilliseconds)
|
WorkerCallsReturnedDuration = stats.Float64("sealing/worker_calls_returned_ms", "Counter of returned worker tasks", stats.UnitMilliseconds)
|
||||||
WorkerUntrackedCallsReturned = stats.Int64("sealing/worker_untracked_calls_returned", "Counter of returned untracked worker tasks", stats.UnitDimensionless)
|
WorkerUntrackedCallsReturned = stats.Int64("sealing/worker_untracked_calls_returned", "Counter of returned untracked worker tasks", stats.UnitDimensionless)
|
||||||
|
|
||||||
|
SectorStates = stats.Int64("sealing/states", "Number of sectors in each state", stats.UnitDimensionless)
|
||||||
|
|
||||||
StorageFSAvailable = stats.Float64("storage/path_fs_available_frac", "Fraction of filesystem available storage", stats.UnitDimensionless)
|
StorageFSAvailable = stats.Float64("storage/path_fs_available_frac", "Fraction of filesystem available storage", stats.UnitDimensionless)
|
||||||
StorageAvailable = stats.Float64("storage/path_available_frac", "Fraction of available storage", stats.UnitDimensionless)
|
StorageAvailable = stats.Float64("storage/path_available_frac", "Fraction of available storage", stats.UnitDimensionless)
|
||||||
StorageReserved = stats.Float64("storage/path_reserved_frac", "Fraction of reserved storage", stats.UnitDimensionless)
|
StorageReserved = stats.Float64("storage/path_reserved_frac", "Fraction of reserved storage", stats.UnitDimensionless)
|
||||||
@ -324,6 +327,11 @@ var (
|
|||||||
Aggregation: workMillisecondsDistribution,
|
Aggregation: workMillisecondsDistribution,
|
||||||
TagKeys: []tag.Key{TaskType, WorkerHostname},
|
TagKeys: []tag.Key{TaskType, WorkerHostname},
|
||||||
}
|
}
|
||||||
|
SectorStatesView = &view.View{
|
||||||
|
Measure: SectorStates,
|
||||||
|
Aggregation: view.LastValue(),
|
||||||
|
TagKeys: []tag.Key{SectorState},
|
||||||
|
}
|
||||||
StorageFSAvailableView = &view.View{
|
StorageFSAvailableView = &view.View{
|
||||||
Measure: StorageFSAvailable,
|
Measure: StorageFSAvailable,
|
||||||
Aggregation: view.LastValue(),
|
Aggregation: view.LastValue(),
|
||||||
@ -520,14 +528,17 @@ var MinerNodeViews = append([]*view.View{
|
|||||||
WorkerCallsReturnedCountView,
|
WorkerCallsReturnedCountView,
|
||||||
WorkerUntrackedCallsReturnedView,
|
WorkerUntrackedCallsReturnedView,
|
||||||
WorkerCallsReturnedDurationView,
|
WorkerCallsReturnedDurationView,
|
||||||
|
SectorStatesView,
|
||||||
StorageFSAvailableView,
|
StorageFSAvailableView,
|
||||||
StorageAvailableView,
|
StorageAvailableView,
|
||||||
StorageReservedView,
|
StorageReservedView,
|
||||||
StorageLimitUsedView,
|
StorageLimitUsedView,
|
||||||
|
StorageCapacityBytesView,
|
||||||
StorageFSAvailableBytesView,
|
StorageFSAvailableBytesView,
|
||||||
StorageAvailableBytesView,
|
StorageAvailableBytesView,
|
||||||
StorageReservedBytesView,
|
StorageReservedBytesView,
|
||||||
StorageLimitUsedBytesView,
|
StorageLimitUsedBytesView,
|
||||||
|
StorageLimitMaxBytesView,
|
||||||
}, DefaultViews...)
|
}, DefaultViews...)
|
||||||
|
|
||||||
// SinceInMilliseconds returns the duration of time since the provide time as a float64.
|
// SinceInMilliseconds returns the duration of time since the provide time as a float64.
|
||||||
|
Loading…
Reference in New Issue
Block a user