add vm execution metrics
This commit is contained in:
parent
97c5df540d
commit
61ba4baffa
@ -7,9 +7,13 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"go.opencensus.io/stats"
|
||||||
|
"go.opencensus.io/tag"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -81,6 +85,7 @@ func (e *vmExecutor) Done() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type executionToken struct {
|
type executionToken struct {
|
||||||
|
lane ExecutionLane
|
||||||
reserved int
|
reserved int
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,6 +104,9 @@ type executionEnv struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *executionEnv) getToken(lane ExecutionLane) *executionToken {
|
func (e *executionEnv) getToken(lane ExecutionLane) *executionToken {
|
||||||
|
metricsUp(metrics.VMExecutionWaiting, lane)
|
||||||
|
defer metricsDown(metrics.VMExecutionWaiting, lane)
|
||||||
|
|
||||||
e.mx.Lock()
|
e.mx.Lock()
|
||||||
defer e.mx.Unlock()
|
defer e.mx.Unlock()
|
||||||
|
|
||||||
@ -109,7 +117,9 @@ func (e *executionEnv) getToken(lane ExecutionLane) *executionToken {
|
|||||||
}
|
}
|
||||||
|
|
||||||
e.available--
|
e.available--
|
||||||
return &executionToken{reserved: 0}
|
|
||||||
|
metricsUp(metrics.VMExecutionActive, lane)
|
||||||
|
return &executionToken{lane: lane, reserved: 0}
|
||||||
|
|
||||||
case ExecutionLanePriority:
|
case ExecutionLanePriority:
|
||||||
for e.available == 0 {
|
for e.available == 0 {
|
||||||
@ -123,7 +133,9 @@ func (e *executionEnv) getToken(lane ExecutionLane) *executionToken {
|
|||||||
e.reserved--
|
e.reserved--
|
||||||
reserving = 1
|
reserving = 1
|
||||||
}
|
}
|
||||||
return &executionToken{reserved: reserving}
|
|
||||||
|
metricsUp(metrics.VMExecutionActive, lane)
|
||||||
|
return &executionToken{lane: lane, reserved: reserving}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
// already checked at interface boundary in NewVM, so this is appropriate
|
// already checked at interface boundary in NewVM, so this is appropriate
|
||||||
@ -139,6 +151,29 @@ func (e *executionEnv) putToken(token *executionToken) {
|
|||||||
e.reserved += token.reserved
|
e.reserved += token.reserved
|
||||||
|
|
||||||
e.cond.Broadcast()
|
e.cond.Broadcast()
|
||||||
|
|
||||||
|
metricsDown(metrics.VMExecutionActive, token.lane)
|
||||||
|
}
|
||||||
|
|
||||||
|
func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) {
|
||||||
|
metricsAdjust(metric, lane, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func metricsDown(metric *stats.Int64Measure, lane ExecutionLane) {
|
||||||
|
metricsAdjust(metric, lane, -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) {
|
||||||
|
laneName := "default"
|
||||||
|
if lane > ExecutionLaneDefault {
|
||||||
|
laneName = "priority"
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, _ := tag.New(
|
||||||
|
context.Background(),
|
||||||
|
tag.Upsert(metrics.ExecutionLane, laneName),
|
||||||
|
)
|
||||||
|
stats.Record(ctx, metric.M(int64(delta)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -58,6 +58,9 @@ var (
|
|||||||
ProtocolID, _ = tag.NewKey("proto")
|
ProtocolID, _ = tag.NewKey("proto")
|
||||||
Direction, _ = tag.NewKey("direction")
|
Direction, _ = tag.NewKey("direction")
|
||||||
UseFD, _ = tag.NewKey("use_fd")
|
UseFD, _ = tag.NewKey("use_fd")
|
||||||
|
|
||||||
|
// vm execution
|
||||||
|
ExecutionLane, _ = tag.NewKey("lane")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Measures
|
// Measures
|
||||||
@ -121,6 +124,8 @@ var (
|
|||||||
VMApplyFlush = stats.Float64("vm/applyblocks_flush", "Time spent flushing vm state", stats.UnitMilliseconds)
|
VMApplyFlush = stats.Float64("vm/applyblocks_flush", "Time spent flushing vm state", stats.UnitMilliseconds)
|
||||||
VMSends = stats.Int64("vm/sends", "Counter for sends processed by the VM", stats.UnitDimensionless)
|
VMSends = stats.Int64("vm/sends", "Counter for sends processed by the VM", stats.UnitDimensionless)
|
||||||
VMApplied = stats.Int64("vm/applied", "Counter for messages (including internal messages) processed by the VM", stats.UnitDimensionless)
|
VMApplied = stats.Int64("vm/applied", "Counter for messages (including internal messages) processed by the VM", stats.UnitDimensionless)
|
||||||
|
VMExecutionWaiting = stats.Int64("vm/execution_waiting", "Counter for VM executions waiting to be assigned to a lane", stats.UnitDimensionless)
|
||||||
|
VMExecutionActive = stats.Int64("vm/execution_default", "Counter for active VM executions", stats.UnitDimensionless)
|
||||||
|
|
||||||
// miner
|
// miner
|
||||||
WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless)
|
WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless)
|
||||||
@ -363,6 +368,16 @@ var (
|
|||||||
Measure: VMApplied,
|
Measure: VMApplied,
|
||||||
Aggregation: view.LastValue(),
|
Aggregation: view.LastValue(),
|
||||||
}
|
}
|
||||||
|
VMExecutionWaitingView = &view.View{
|
||||||
|
Measure: VMExecutionWaiting,
|
||||||
|
Aggregation: view.LastValue(),
|
||||||
|
TagKeys: []tag.Key{ExecutionLane},
|
||||||
|
}
|
||||||
|
VMExecutionActiveView = &view.View{
|
||||||
|
Measure: VMExecutionActive,
|
||||||
|
Aggregation: view.LastValue(),
|
||||||
|
TagKeys: []tag.Key{ExecutionLane},
|
||||||
|
}
|
||||||
|
|
||||||
// miner
|
// miner
|
||||||
WorkerCallsStartedView = &view.View{
|
WorkerCallsStartedView = &view.View{
|
||||||
|
Loading…
Reference in New Issue
Block a user