From fe230f901eb272eea1c2419fd30dbfc548b5db9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 21 Feb 2021 11:03:00 +0100 Subject: [PATCH] Collect worker task metrics --- cmd/lotus-gateway/main.go | 2 +- cmd/lotus-storage-miner/run.go | 14 ++- cmd/lotus/daemon.go | 25 +++-- cmd/lotus/rpc.go | 21 +--- extern/sector-storage/manager.go | 22 ++-- extern/sector-storage/manager_calltracker.go | 6 +- extern/sector-storage/sched_worker.go | 4 +- extern/sector-storage/worker_tracked.go | 63 +++++++---- metrics/exporter.go | 32 ++++++ metrics/metrics.go | 112 ++++++++++++++----- 10 files changed, 202 insertions(+), 99 deletions(-) create mode 100644 metrics/exporter.go diff --git a/cmd/lotus-gateway/main.go b/cmd/lotus-gateway/main.go index 5190ea798..23b743d73 100644 --- a/cmd/lotus-gateway/main.go +++ b/cmd/lotus-gateway/main.go @@ -77,7 +77,7 @@ var runCmd = &cli.Command{ // Register all metric views if err := view.Register( - metrics.DefaultViews..., + metrics.ChainNodeViews..., ); err != nil { log.Fatalf("Cannot register the view: %v", err) } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 0c2fba8b3..741a18eb6 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -13,6 +13,7 @@ import ( "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/urfave/cli/v2" + "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" "golang.org/x/xerrors" @@ -68,14 +69,20 @@ var runCmd = &cli.Command{ return xerrors.Errorf("getting full node api: %w", err) } defer ncloser() - ctx := lcli.DaemonContext(cctx) + ctx, _ := tag.New(lcli.DaemonContext(cctx), + tag.Insert(metrics.Version, build.BuildVersion), + tag.Insert(metrics.Commit, build.CurrentCommit), + tag.Insert(metrics.NodeType, "miner"), + ) // Register all metric views - if err := view.Register( - metrics.DefaultViews..., + if err = view.Register( + metrics.MinerNodeViews..., ); err != nil { log.Fatalf("Cannot register the view: %v", err) } + // Set the metric to one so it is published to the exporter + stats.Record(ctx, metrics.LotusInfo.M(1)) v, err := nodeApi.Version(ctx) if err != nil { @@ -162,6 +169,7 @@ var runCmd = &cli.Command{ mux.Handle("/rpc/v0", rpcServer) mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote) + mux.Handle("/debug/metrics", metrics.Exporter()) mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof ah := &auth.Handler{ diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 1cbad2fa9..b3eaf0f8c 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -192,7 +192,20 @@ var DaemonCmd = &cli.Command{ return fmt.Errorf("unrecognized profile type: %q", profile) } - ctx, _ := tag.New(context.Background(), tag.Insert(metrics.Version, build.BuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit)) + ctx, _ := tag.New(context.Background(), + tag.Insert(metrics.Version, build.BuildVersion), + tag.Insert(metrics.Commit, build.CurrentCommit), + tag.Insert(metrics.NodeType, "chain"), + ) + // Register all metric views + if err = view.Register( + metrics.ChainNodeViews..., + ); err != nil { + log.Fatalf("Cannot register the view: %v", err) + } + // Set the metric to one so it is published to the exporter + stats.Record(ctx, metrics.LotusInfo.M(1)) + { dir, err := homedir.Expand(cctx.String("repo")) if err != nil { @@ -332,16 +345,6 @@ var DaemonCmd = &cli.Command{ } } - // Register all metric views - if err = view.Register( - metrics.DefaultViews..., - ); err != nil { - log.Fatalf("Cannot register the view: %v", err) - } - - // Set the metric to one so it is published to the exporter - stats.Record(ctx, metrics.LotusInfo.M(1)) - endpoint, err := r.APIEndpoint() if err != nil { return xerrors.Errorf("getting api endpoint: %w", err) diff --git a/cmd/lotus/rpc.go b/cmd/lotus/rpc.go index 82a1fb480..48720d833 100644 --- a/cmd/lotus/rpc.go +++ b/cmd/lotus/rpc.go @@ -15,12 +15,9 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" - promclient "github.com/prometheus/client_golang/prometheus" "go.opencensus.io/tag" "golang.org/x/xerrors" - "contrib.go.opencensus.io/exporter/prometheus" - "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc/auth" @@ -55,23 +52,7 @@ func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shut http.Handle("/rest/v0/import", importAH) - // Prometheus globals are exposed as interfaces, but the prometheus - // OpenCensus exporter expects a concrete *Registry. The concrete type of - // the globals are actually *Registry, so we downcast them, staying - // defensive in case things change under the hood. - registry, ok := promclient.DefaultRegisterer.(*promclient.Registry) - if !ok { - log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", promclient.DefaultRegisterer) - } - exporter, err := prometheus.NewExporter(prometheus.Options{ - Registry: registry, - Namespace: "lotus", - }) - if err != nil { - log.Fatalf("could not create the prometheus stats exporter: %v", err) - } - - http.Handle("/debug/metrics", exporter) + http.Handle("/debug/metrics", metrics.Exporter()) http.Handle("/debug/pprof-set/block", handleFractionOpt("BlockProfileRate", runtime.SetBlockProfileRate)) http.Handle("/debug/pprof-set/mutex", handleFractionOpt("MutexProfileFraction", func(x int) { runtime.SetMutexProfileFraction(x) }, diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index a9b31f38a..3db7ac9ec 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -632,47 +632,47 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error { } func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error { - return m.returnResult(callID, pi, err) + return m.returnResult(ctx, callID, pi, err) } func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error { - return m.returnResult(callID, p1o, err) + return m.returnResult(ctx, callID, p1o, err) } func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error { - return m.returnResult(callID, sealed, err) + return m.returnResult(ctx, callID, sealed, err) } func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error { - return m.returnResult(callID, out, err) + return m.returnResult(ctx, callID, out, err) } func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error { - return m.returnResult(callID, proof, err) + return m.returnResult(ctx, callID, proof, err) } func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { - return m.returnResult(callID, nil, err) + return m.returnResult(ctx, callID, nil, err) } func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { - return m.returnResult(callID, nil, err) + return m.returnResult(ctx, callID, nil, err) } func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { - return m.returnResult(callID, nil, err) + return m.returnResult(ctx, callID, nil, err) } func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { - return m.returnResult(callID, nil, err) + return m.returnResult(ctx, callID, nil, err) } func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error { - return m.returnResult(callID, ok, err) + return m.returnResult(ctx, callID, ok, err) } func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { - return m.returnResult(callID, nil, err) + return m.returnResult(ctx, callID, nil, err) } func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) { diff --git a/extern/sector-storage/manager_calltracker.go b/extern/sector-storage/manager_calltracker.go index c3b2e3190..332a08817 100644 --- a/extern/sector-storage/manager_calltracker.go +++ b/extern/sector-storage/manager_calltracker.go @@ -349,7 +349,7 @@ func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interf } } -func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *storiface.CallError) error { +func (m *Manager) returnResult(ctx context.Context, callID storiface.CallID, r interface{}, cerr *storiface.CallError) error { res := result{ r: r, } @@ -357,7 +357,7 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *sto res.err = cerr } - m.sched.workTracker.onDone(callID) + m.sched.workTracker.onDone(ctx, callID) m.workLk.Lock() defer m.workLk.Unlock() @@ -413,5 +413,5 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *sto func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error { // TODO: Allow temp error - return m.returnResult(call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted"))) + return m.returnResult(ctx, call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted"))) } diff --git a/extern/sector-storage/sched_worker.go b/extern/sector-storage/sched_worker.go index 40cf2fcf4..4e18e5c6f 100644 --- a/extern/sector-storage/sched_worker.go +++ b/extern/sector-storage/sched_worker.go @@ -398,7 +398,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe go func() { // first run the prepare step (e.g. fetching sector data from other worker) - err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.workerRpc)) + err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc)) sh.workersLk.Lock() if err != nil { @@ -437,7 +437,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe } // Do the work! - err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.workerRpc)) + err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc)) select { case req.ret <- workerResponse{err: err}: diff --git a/extern/sector-storage/worker_tracked.go b/extern/sector-storage/worker_tracked.go index febb190c5..aeb3eea74 100644 --- a/extern/sector-storage/worker_tracked.go +++ b/extern/sector-storage/worker_tracked.go @@ -7,17 +7,21 @@ import ( "time" "github.com/ipfs/go-cid" + "go.opencensus.io/stats" + "go.opencensus.io/tag" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" + "github.com/filecoin-project/lotus/metrics" ) type trackedWork struct { - job storiface.WorkerJob - worker WorkerID + job storiface.WorkerJob + worker WorkerID + workerHostname string } type workTracker struct { @@ -29,20 +33,31 @@ type workTracker struct { // TODO: done, aggregate stats, queue stats, scheduler feedback } -func (wt *workTracker) onDone(callID storiface.CallID) { +func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) { wt.lk.Lock() defer wt.lk.Unlock() - _, ok := wt.running[callID] + t, ok := wt.running[callID] if !ok { wt.done[callID] = struct{}{} + + stats.Record(ctx, metrics.WorkerUntrackedCallsReturned.M(1)) return } + took := metrics.SinceInMilliseconds(t.job.Start) + + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.TaskType, string(t.job.Task)), + tag.Upsert(metrics.WorkerHostname, t.workerHostname), + ) + stats.Record(ctx, metrics.WorkerCallsReturnedCount.M(1), metrics.WorkerCallsReturnedDuration.M(took)) + delete(wt.running, callID) } -func (wt *workTracker) track(wid WorkerID, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { +func (wt *workTracker) track(ctx context.Context, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { return func(callID storiface.CallID, err error) (storiface.CallID, error) { if err != nil { return callID, err @@ -64,17 +79,26 @@ func (wt *workTracker) track(wid WorkerID, sid storage.SectorRef, task sealtasks Task: task, Start: time.Now(), }, - worker: wid, + worker: wid, + workerHostname: wi.Hostname, } + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.TaskType, string(task)), + tag.Upsert(metrics.WorkerHostname, wi.Hostname), + ) + stats.Record(ctx, metrics.WorkerCallsStarted.M(1)) + return callID, err } } -func (wt *workTracker) worker(wid WorkerID, w Worker) Worker { +func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) Worker { return &trackedWorker{ - Worker: w, - wid: wid, + Worker: w, + wid: wid, + workerInfo: wi, tracker: wt, } @@ -94,45 +118,46 @@ func (wt *workTracker) Running() []trackedWork { type trackedWorker struct { Worker - wid WorkerID + wid WorkerID + workerInfo storiface.WorkerInfo tracker *workTracker } func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { - return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)) + return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)) } func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) { - return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o)) + return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o)) } func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { - return t.tracker.track(t.wid, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) + return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) } func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) { - return t.tracker.track(t.wid, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o)) + return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o)) } func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) { - return t.tracker.track(t.wid, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed)) + return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed)) } func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { - return t.tracker.track(t.wid, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)) + return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)) } func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { - return t.tracker.track(t.wid, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am)) + return t.tracker.track(ctx, t.wid, t.workerInfo, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am)) } func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { - return t.tracker.track(t.wid, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)) + return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)) } func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) { - return t.tracker.track(t.wid, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size)) + return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size)) } var _ Worker = &trackedWorker{} diff --git a/metrics/exporter.go b/metrics/exporter.go new file mode 100644 index 000000000..92786c26b --- /dev/null +++ b/metrics/exporter.go @@ -0,0 +1,32 @@ +package metrics + +import ( + "net/http" + _ "net/http/pprof" + + "contrib.go.opencensus.io/exporter/prometheus" + logging "github.com/ipfs/go-log/v2" + promclient "github.com/prometheus/client_golang/prometheus" +) + +var log = logging.Logger("metrics") + +func Exporter() http.Handler { + // Prometheus globals are exposed as interfaces, but the prometheus + // OpenCensus exporter expects a concrete *Registry. The concrete type of + // the globals are actually *Registry, so we downcast them, staying + // defensive in case things change under the hood. + registry, ok := promclient.DefaultRegisterer.(*promclient.Registry) + if !ok { + log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", promclient.DefaultRegisterer) + } + exporter, err := prometheus.NewExporter(prometheus.Options{ + Registry: registry, + Namespace: "lotus", + }) + if err != nil { + log.Errorf("could not create the prometheus stats exporter: %v", err) + } + + return exporter +} diff --git a/metrics/metrics.go b/metrics/metrics.go index cb909d639..45869ea91 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -15,14 +15,24 @@ import ( // Distribution var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 3000, 4000, 5000, 7500, 10000, 20000, 50000, 100000) +var workMillisecondsDistribution = view.Distribution( + 250, 500, 1000, 2000, 5000, 10_000, 30_000, 60_000, 2*60_000, 5*60_000, 10*60_000, 15*60_000, 30*60_000, // short sealing tasks + 40*60_000, 45*60_000, 50*60_000, 55*60_000, 60*60_000, 65*60_000, 70*60_000, 75*60_000, 80*60_000, 85*60_000, 100*60_000, 120*60_000, // PC2 / C2 range + 130*60_000, 140*60_000, 150*60_000, 160*60_000, 180*60_000, 200*60_000, 220*60_000, 260*60_000, 300*60_000, // PC1 range + 350*60_000, 400*60_000, 600*60_000, 800*60_000, 1000*60_000, 1300*60_000, 1800*60_000, 4000*60_000, 10000*60_000, // intel PC1 range +) // Global Tags var ( - Version, _ = tag.NewKey("version") - Commit, _ = tag.NewKey("commit") - PeerID, _ = tag.NewKey("peer_id") - MinerID, _ = tag.NewKey("miner_id") - FailureType, _ = tag.NewKey("failure_type") + // common + Version, _ = tag.NewKey("version") + Commit, _ = tag.NewKey("commit") + NodeType, _ = tag.NewKey("node_type") + PeerID, _ = tag.NewKey("peer_id") + MinerID, _ = tag.NewKey("miner_id") + FailureType, _ = tag.NewKey("failure_type") + + // chain Local, _ = tag.NewKey("local") MessageFrom, _ = tag.NewKey("message_from") MessageTo, _ = tag.NewKey("message_to") @@ -30,11 +40,20 @@ var ( ReceivedFrom, _ = tag.NewKey("received_from") Endpoint, _ = tag.NewKey("endpoint") APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls + + // miner + TaskType, _ = tag.NewKey("task_type") + WorkerHostname, _ = tag.NewKey("worker_hostname") ) // Measures var ( - LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) + // common + LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) + PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) + APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds) + + // chain ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless) ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) @@ -48,7 +67,6 @@ var ( BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless) BlockValidationDurationMilliseconds = stats.Float64("block/validation_ms", "Duration for Block Validation in ms", stats.UnitMilliseconds) BlockDelay = stats.Int64("block/delay", "Delay of accepted blocks, where delay is >5s", stats.UnitMilliseconds) - PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) PubsubPublishMessage = stats.Int64("pubsub/published", "Counter for total published messages", stats.UnitDimensionless) PubsubDeliverMessage = stats.Int64("pubsub/delivered", "Counter for total delivered messages", stats.UnitDimensionless) PubsubRejectMessage = stats.Int64("pubsub/rejected", "Counter for total rejected messages", stats.UnitDimensionless) @@ -56,9 +74,14 @@ var ( PubsubRecvRPC = stats.Int64("pubsub/recv_rpc", "Counter for total received RPCs", stats.UnitDimensionless) PubsubSendRPC = stats.Int64("pubsub/send_rpc", "Counter for total sent RPCs", stats.UnitDimensionless) PubsubDropRPC = stats.Int64("pubsub/drop_rpc", "Counter for total dropped RPCs", stats.UnitDimensionless) - APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds) VMFlushCopyDuration = stats.Float64("vm/flush_copy_ms", "Time spent in VM Flush Copy", stats.UnitMilliseconds) VMFlushCopyCount = stats.Int64("vm/flush_copy_count", "Number of copied objects", stats.UnitDimensionless) + + // miner + WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless) + WorkerCallsReturnedCount = stats.Int64("sealing/worker_calls_returned_count", "Counter of returned worker tasks", stats.UnitDimensionless) + WorkerCallsReturnedDuration = stats.Float64("sealing/worker_calls_returned_ms", "Counter of returned worker tasks", stats.UnitMilliseconds) + WorkerUntrackedCallsReturned = stats.Int64("sealing/worker_untracked_calls_returned", "Counter of returned untracked worker tasks", stats.UnitDimensionless) ) var ( @@ -178,41 +201,72 @@ var ( Measure: VMFlushCopyCount, Aggregation: view.Sum(), } + + // miner + WorkerCallsStartedView = &view.View{ + Measure: WorkerCallsStarted, + Aggregation: view.Count(), + TagKeys: []tag.Key{TaskType, WorkerHostname}, + } + WorkerCallsReturnedCountView = &view.View{ + Measure: WorkerCallsReturnedCount, + Aggregation: view.Count(), + TagKeys: []tag.Key{TaskType, WorkerHostname}, + } + WorkerUntrackedCallsReturnedView = &view.View{ + Measure: WorkerUntrackedCallsReturned, + Aggregation: view.Count(), + } + WorkerCallsReturnedDurationView = &view.View{ + Measure: WorkerCallsReturnedDuration, + Aggregation: workMillisecondsDistribution, + TagKeys: []tag.Key{TaskType, WorkerHostname}, + } ) // DefaultViews is an array of OpenCensus views for metric gathering purposes var DefaultViews = func() []*view.View { views := []*view.View{ InfoView, - ChainNodeHeightView, - ChainNodeHeightExpectedView, - ChainNodeWorkerHeightView, - BlockReceivedView, - BlockValidationFailureView, - BlockValidationSuccessView, - BlockValidationDurationView, - BlockDelayView, - MessagePublishedView, - MessageReceivedView, - MessageValidationFailureView, - MessageValidationSuccessView, PeerCountView, - PubsubPublishMessageView, - PubsubDeliverMessageView, - PubsubRejectMessageView, - PubsubDuplicateMessageView, - PubsubRecvRPCView, - PubsubSendRPCView, - PubsubDropRPCView, APIRequestDurationView, - VMFlushCopyCountView, - VMFlushCopyDurationView, } views = append(views, blockstore.DefaultViews...) views = append(views, rpcmetrics.DefaultViews...) return views }() +var ChainNodeViews = append([]*view.View{ + ChainNodeHeightView, + ChainNodeHeightExpectedView, + ChainNodeWorkerHeightView, + BlockReceivedView, + BlockValidationFailureView, + BlockValidationSuccessView, + BlockValidationDurationView, + BlockDelayView, + MessagePublishedView, + MessageReceivedView, + MessageValidationFailureView, + MessageValidationSuccessView, + PubsubPublishMessageView, + PubsubDeliverMessageView, + PubsubRejectMessageView, + PubsubDuplicateMessageView, + PubsubRecvRPCView, + PubsubSendRPCView, + PubsubDropRPCView, + VMFlushCopyCountView, + VMFlushCopyDurationView, +}, DefaultViews...) + +var MinerNodeViews = append([]*view.View{ + WorkerCallsStartedView, + WorkerCallsReturnedCountView, + WorkerUntrackedCallsReturnedView, + WorkerCallsReturnedDurationView, +}, DefaultViews...) + // SinceInMilliseconds returns the duration of time since the provide time as a float64. func SinceInMilliseconds(startTime time.Time) float64 { return float64(time.Since(startTime).Nanoseconds()) / 1e6