diff --git a/chain/vm/execution.go b/chain/vm/execution.go index b2573dffc..d984d0095 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -7,9 +7,13 @@ import ( "strconv" "sync" + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/metrics" ) const ( @@ -81,6 +85,7 @@ func (e *vmExecutor) Done() { } type executionToken struct { + lane ExecutionLane reserved int } @@ -99,6 +104,9 @@ type executionEnv struct { } func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { + metricsUp(metrics.VMExecutionWaiting, lane) + defer metricsDown(metrics.VMExecutionWaiting, lane) + e.mx.Lock() defer e.mx.Unlock() @@ -109,7 +117,9 @@ func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { } e.available-- - return &executionToken{reserved: 0} + + metricsUp(metrics.VMExecutionActive, lane) + return &executionToken{lane: lane, reserved: 0} case ExecutionLanePriority: for e.available == 0 { @@ -123,7 +133,9 @@ func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { e.reserved-- reserving = 1 } - return &executionToken{reserved: reserving} + + metricsUp(metrics.VMExecutionActive, lane) + return &executionToken{lane: lane, reserved: reserving} default: // already checked at interface boundary in NewVM, so this is appropriate @@ -139,6 +151,29 @@ func (e *executionEnv) putToken(token *executionToken) { e.reserved += token.reserved e.cond.Broadcast() + + metricsDown(metrics.VMExecutionActive, token.lane) +} + +func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) { + metricsAdjust(metric, lane, 1) +} + +func metricsDown(metric *stats.Int64Measure, lane ExecutionLane) { + metricsAdjust(metric, lane, -1) +} + +func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) { + laneName := "default" + if lane > ExecutionLaneDefault { + laneName = "priority" + } + + ctx, _ := tag.New( + context.Background(), + tag.Upsert(metrics.ExecutionLane, laneName), + ) + stats.Record(ctx, metric.M(int64(delta))) } func init() { diff --git a/metrics/metrics.go b/metrics/metrics.go index ca638ac27..61bd86fbd 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -58,6 +58,9 @@ var ( ProtocolID, _ = tag.NewKey("proto") Direction, _ = tag.NewKey("direction") UseFD, _ = tag.NewKey("use_fd") + + // vm execution + ExecutionLane, _ = tag.NewKey("lane") ) // Measures @@ -121,6 +124,8 @@ var ( VMApplyFlush = stats.Float64("vm/applyblocks_flush", "Time spent flushing vm state", stats.UnitMilliseconds) VMSends = stats.Int64("vm/sends", "Counter for sends processed by the VM", stats.UnitDimensionless) VMApplied = stats.Int64("vm/applied", "Counter for messages (including internal messages) processed by the VM", stats.UnitDimensionless) + VMExecutionWaiting = stats.Int64("vm/execution_waiting", "Counter for VM executions waiting to be assigned to a lane", stats.UnitDimensionless) + VMExecutionActive = stats.Int64("vm/execution_default", "Counter for active VM executions", stats.UnitDimensionless) // miner WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless) @@ -363,6 +368,16 @@ var ( Measure: VMApplied, Aggregation: view.LastValue(), } + VMExecutionWaitingView = &view.View{ + Measure: VMExecutionWaiting, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{ExecutionLane}, + } + VMExecutionActiveView = &view.View{ + Measure: VMExecutionActive, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{ExecutionLane}, + } // miner WorkerCallsStartedView = &view.View{