Merge pull request #6056 from filecoin-project/feat/vm-metrics
stmgr: Improve ApplyBlocks metrics
This commit is contained in:
commit
2e63690125
@ -5,11 +5,13 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -43,6 +45,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/vm"
|
||||
"github.com/filecoin-project/lotus/metrics"
|
||||
)
|
||||
|
||||
const LookbackNoLimit = api.LookbackNoLimit
|
||||
@ -280,6 +283,13 @@ func (sm *StateManager) ExecutionTrace(ctx context.Context, ts *types.TipSet) (c
|
||||
type ExecCallback func(cid.Cid, *types.Message, *vm.ApplyRet) error
|
||||
|
||||
func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEpoch, pstate cid.Cid, bms []store.BlockMessages, epoch abi.ChainEpoch, r vm.Rand, cb ExecCallback, baseFee abi.TokenAmount, ts *types.TipSet) (cid.Cid, cid.Cid, error) {
|
||||
done := metrics.Timer(ctx, metrics.VMApplyBlocksTotal)
|
||||
defer done()
|
||||
|
||||
partDone := metrics.Timer(ctx, metrics.VMApplyEarly)
|
||||
defer func() {
|
||||
partDone()
|
||||
}()
|
||||
|
||||
makeVmWithBaseState := func(base cid.Cid) (*vm.VM, error) {
|
||||
vmopt := &vm.VMOpts{
|
||||
@ -303,7 +313,6 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
|
||||
}
|
||||
|
||||
runCron := func(epoch abi.ChainEpoch) error {
|
||||
|
||||
cronMsg := &types.Message{
|
||||
To: cron.Address,
|
||||
From: builtin.SystemActorAddr,
|
||||
@ -362,6 +371,9 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
|
||||
pstate = newState
|
||||
}
|
||||
|
||||
partDone()
|
||||
partDone = metrics.Timer(ctx, metrics.VMApplyMessages)
|
||||
|
||||
var receipts []cbg.CBORMarshaler
|
||||
processedMsgs := make(map[cid.Cid]struct{})
|
||||
for _, b := range bms {
|
||||
@ -426,10 +438,16 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
|
||||
}
|
||||
}
|
||||
|
||||
partDone()
|
||||
partDone = metrics.Timer(ctx, metrics.VMApplyCron)
|
||||
|
||||
if err := runCron(epoch); err != nil {
|
||||
return cid.Cid{}, cid.Cid{}, err
|
||||
}
|
||||
|
||||
partDone()
|
||||
partDone = metrics.Timer(ctx, metrics.VMApplyFlush)
|
||||
|
||||
rectarr := blockadt.MakeEmptyArray(sm.cs.ActorStore(ctx))
|
||||
for i, receipt := range receipts {
|
||||
if err := rectarr.Set(uint64(i), receipt); err != nil {
|
||||
@ -446,6 +464,9 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
|
||||
return cid.Undef, cid.Undef, xerrors.Errorf("vm flush failed: %w", err)
|
||||
}
|
||||
|
||||
stats.Record(ctx, metrics.VMSends.M(int64(atomic.LoadUint64(&vm.StatSends))),
|
||||
metrics.VMApplied.M(int64(atomic.LoadUint64(&vm.StatApplied))))
|
||||
|
||||
return st, rectroot, nil
|
||||
}
|
||||
|
||||
|
@ -76,6 +76,13 @@ var (
|
||||
PubsubDropRPC = stats.Int64("pubsub/drop_rpc", "Counter for total dropped RPCs", stats.UnitDimensionless)
|
||||
VMFlushCopyDuration = stats.Float64("vm/flush_copy_ms", "Time spent in VM Flush Copy", stats.UnitMilliseconds)
|
||||
VMFlushCopyCount = stats.Int64("vm/flush_copy_count", "Number of copied objects", stats.UnitDimensionless)
|
||||
VMApplyBlocksTotal = stats.Float64("vm/applyblocks_total_ms", "Time spent applying block state", stats.UnitMilliseconds)
|
||||
VMApplyMessages = stats.Float64("vm/applyblocks_messages", "Time spent applying block messages", stats.UnitMilliseconds)
|
||||
VMApplyEarly = stats.Float64("vm/applyblocks_early", "Time spent in early apply-blocks (null cron, upgrades)", stats.UnitMilliseconds)
|
||||
VMApplyCron = stats.Float64("vm/applyblocks_cron", "Time spent in cron", stats.UnitMilliseconds)
|
||||
VMApplyFlush = stats.Float64("vm/applyblocks_flush", "Time spent flushing vm state", stats.UnitMilliseconds)
|
||||
VMSends = stats.Int64("vm/sends", "Counter for sends processed by the VM", stats.UnitDimensionless)
|
||||
VMApplied = stats.Int64("vm/applied", "Counter for messages (including internal messages) processed by the VM", stats.UnitDimensionless)
|
||||
|
||||
// miner
|
||||
WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless)
|
||||
@ -208,6 +215,34 @@ var (
|
||||
Measure: VMFlushCopyCount,
|
||||
Aggregation: view.Sum(),
|
||||
}
|
||||
VMApplyBlocksTotalView = &view.View{
|
||||
Measure: VMApplyBlocksTotal,
|
||||
Aggregation: defaultMillisecondsDistribution,
|
||||
}
|
||||
VMApplyMessagesView = &view.View{
|
||||
Measure: VMApplyMessages,
|
||||
Aggregation: defaultMillisecondsDistribution,
|
||||
}
|
||||
VMApplyEarlyView = &view.View{
|
||||
Measure: VMApplyEarly,
|
||||
Aggregation: defaultMillisecondsDistribution,
|
||||
}
|
||||
VMApplyCronView = &view.View{
|
||||
Measure: VMApplyCron,
|
||||
Aggregation: defaultMillisecondsDistribution,
|
||||
}
|
||||
VMApplyFlushView = &view.View{
|
||||
Measure: VMApplyFlush,
|
||||
Aggregation: defaultMillisecondsDistribution,
|
||||
}
|
||||
VMSendsView = &view.View{
|
||||
Measure: VMSends,
|
||||
Aggregation: view.LastValue(),
|
||||
}
|
||||
VMAppliedView = &view.View{
|
||||
Measure: VMApplied,
|
||||
Aggregation: view.LastValue(),
|
||||
}
|
||||
|
||||
// miner
|
||||
WorkerCallsStartedView = &view.View{
|
||||
@ -292,6 +327,13 @@ var ChainNodeViews = append([]*view.View{
|
||||
SplitstoreCompactionHotView,
|
||||
SplitstoreCompactionColdView,
|
||||
SplitstoreCompactionDeadView,
|
||||
VMApplyBlocksTotalView,
|
||||
VMApplyMessagesView,
|
||||
VMApplyEarlyView,
|
||||
VMApplyCronView,
|
||||
VMApplyFlushView,
|
||||
VMSendsView,
|
||||
VMAppliedView,
|
||||
}, DefaultViews...)
|
||||
|
||||
var MinerNodeViews = append([]*view.View{
|
||||
|
Loading…
Reference in New Issue
Block a user