Collect worker task metrics

This commit is contained in:
Łukasz Magiera 2021-02-21 11:03:00 +01:00
parent e05dc4ec80
commit fe230f901e
10 changed files with 202 additions and 99 deletions

View File

@ -77,7 +77,7 @@ var runCmd = &cli.Command{
// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
metrics.ChainNodeViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}

View File

@ -13,6 +13,7 @@ import (
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
@ -68,14 +69,20 @@ var runCmd = &cli.Command{
return xerrors.Errorf("getting full node api: %w", err)
}
defer ncloser()
ctx := lcli.DaemonContext(cctx)
ctx, _ := tag.New(lcli.DaemonContext(cctx),
tag.Insert(metrics.Version, build.BuildVersion),
tag.Insert(metrics.Commit, build.CurrentCommit),
tag.Insert(metrics.NodeType, "miner"),
)
// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
if err = view.Register(
metrics.MinerNodeViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}
// Set the metric to one so it is published to the exporter
stats.Record(ctx, metrics.LotusInfo.M(1))
v, err := nodeApi.Version(ctx)
if err != nil {
@ -162,6 +169,7 @@ var runCmd = &cli.Command{
mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote)
mux.Handle("/debug/metrics", metrics.Exporter())
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
ah := &auth.Handler{

View File

@ -192,7 +192,20 @@ var DaemonCmd = &cli.Command{
return fmt.Errorf("unrecognized profile type: %q", profile)
}
ctx, _ := tag.New(context.Background(), tag.Insert(metrics.Version, build.BuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit))
ctx, _ := tag.New(context.Background(),
tag.Insert(metrics.Version, build.BuildVersion),
tag.Insert(metrics.Commit, build.CurrentCommit),
tag.Insert(metrics.NodeType, "chain"),
)
// Register all metric views
if err = view.Register(
metrics.ChainNodeViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}
// Set the metric to one so it is published to the exporter
stats.Record(ctx, metrics.LotusInfo.M(1))
{
dir, err := homedir.Expand(cctx.String("repo"))
if err != nil {
@ -332,16 +345,6 @@ var DaemonCmd = &cli.Command{
}
}
// Register all metric views
if err = view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}
// Set the metric to one so it is published to the exporter
stats.Record(ctx, metrics.LotusInfo.M(1))
endpoint, err := r.APIEndpoint()
if err != nil {
return xerrors.Errorf("getting api endpoint: %w", err)

View File

@ -15,12 +15,9 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
promclient "github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
"contrib.go.opencensus.io/exporter/prometheus"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
@ -55,23 +52,7 @@ func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shut
http.Handle("/rest/v0/import", importAH)
// Prometheus globals are exposed as interfaces, but the prometheus
// OpenCensus exporter expects a concrete *Registry. The concrete type of
// the globals are actually *Registry, so we downcast them, staying
// defensive in case things change under the hood.
registry, ok := promclient.DefaultRegisterer.(*promclient.Registry)
if !ok {
log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", promclient.DefaultRegisterer)
}
exporter, err := prometheus.NewExporter(prometheus.Options{
Registry: registry,
Namespace: "lotus",
})
if err != nil {
log.Fatalf("could not create the prometheus stats exporter: %v", err)
}
http.Handle("/debug/metrics", exporter)
http.Handle("/debug/metrics", metrics.Exporter())
http.Handle("/debug/pprof-set/block", handleFractionOpt("BlockProfileRate", runtime.SetBlockProfileRate))
http.Handle("/debug/pprof-set/mutex", handleFractionOpt("MutexProfileFraction",
func(x int) { runtime.SetMutexProfileFraction(x) },

View File

@ -632,47 +632,47 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
}
func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return m.returnResult(callID, pi, err)
return m.returnResult(ctx, callID, pi, err)
}
func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error {
return m.returnResult(callID, p1o, err)
return m.returnResult(ctx, callID, p1o, err)
}
func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error {
return m.returnResult(callID, sealed, err)
return m.returnResult(ctx, callID, sealed, err)
}
func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error {
return m.returnResult(callID, out, err)
return m.returnResult(ctx, callID, out, err)
}
func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error {
return m.returnResult(callID, proof, err)
return m.returnResult(ctx, callID, proof, err)
}
func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}
func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}
func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}
func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}
func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error {
return m.returnResult(callID, ok, err)
return m.returnResult(ctx, callID, ok, err)
}
func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {

View File

@ -349,7 +349,7 @@ func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interf
}
}
func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *storiface.CallError) error {
func (m *Manager) returnResult(ctx context.Context, callID storiface.CallID, r interface{}, cerr *storiface.CallError) error {
res := result{
r: r,
}
@ -357,7 +357,7 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *sto
res.err = cerr
}
m.sched.workTracker.onDone(callID)
m.sched.workTracker.onDone(ctx, callID)
m.workLk.Lock()
defer m.workLk.Unlock()
@ -413,5 +413,5 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *sto
func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error {
// TODO: Allow temp error
return m.returnResult(call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted")))
return m.returnResult(ctx, call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted")))
}

View File

@ -398,7 +398,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
go func() {
// first run the prepare step (e.g. fetching sector data from other worker)
err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.workerRpc))
err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))
sh.workersLk.Lock()
if err != nil {
@ -437,7 +437,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
}
// Do the work!
err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.workerRpc))
err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))
select {
case req.ret <- workerResponse{err: err}:

View File

@ -7,17 +7,21 @@ import (
"time"
"github.com/ipfs/go-cid"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/metrics"
)
type trackedWork struct {
job storiface.WorkerJob
worker WorkerID
job storiface.WorkerJob
worker WorkerID
workerHostname string
}
type workTracker struct {
@ -29,20 +33,31 @@ type workTracker struct {
// TODO: done, aggregate stats, queue stats, scheduler feedback
}
func (wt *workTracker) onDone(callID storiface.CallID) {
func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) {
wt.lk.Lock()
defer wt.lk.Unlock()
_, ok := wt.running[callID]
t, ok := wt.running[callID]
if !ok {
wt.done[callID] = struct{}{}
stats.Record(ctx, metrics.WorkerUntrackedCallsReturned.M(1))
return
}
took := metrics.SinceInMilliseconds(t.job.Start)
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.TaskType, string(t.job.Task)),
tag.Upsert(metrics.WorkerHostname, t.workerHostname),
)
stats.Record(ctx, metrics.WorkerCallsReturnedCount.M(1), metrics.WorkerCallsReturnedDuration.M(took))
delete(wt.running, callID)
}
func (wt *workTracker) track(wid WorkerID, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
func (wt *workTracker) track(ctx context.Context, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
return func(callID storiface.CallID, err error) (storiface.CallID, error) {
if err != nil {
return callID, err
@ -64,17 +79,26 @@ func (wt *workTracker) track(wid WorkerID, sid storage.SectorRef, task sealtasks
Task: task,
Start: time.Now(),
},
worker: wid,
worker: wid,
workerHostname: wi.Hostname,
}
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.TaskType, string(task)),
tag.Upsert(metrics.WorkerHostname, wi.Hostname),
)
stats.Record(ctx, metrics.WorkerCallsStarted.M(1))
return callID, err
}
}
func (wt *workTracker) worker(wid WorkerID, w Worker) Worker {
func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) Worker {
return &trackedWorker{
Worker: w,
wid: wid,
Worker: w,
wid: wid,
workerInfo: wi,
tracker: wt,
}
@ -94,45 +118,46 @@ func (wt *workTracker) Running() []trackedWork {
type trackedWorker struct {
Worker
wid WorkerID
wid WorkerID
workerInfo storiface.WorkerInfo
tracker *workTracker
}
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
}
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
}
func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
}
func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
}
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
}
func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
}
func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.tracker.track(t.wid, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
return t.tracker.track(ctx, t.wid, t.workerInfo, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
}
func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
return t.tracker.track(t.wid, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
}
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
return t.tracker.track(t.wid, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size))
return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size))
}
var _ Worker = &trackedWorker{}

32
metrics/exporter.go Normal file
View File

@ -0,0 +1,32 @@
package metrics
import (
"net/http"
_ "net/http/pprof"
"contrib.go.opencensus.io/exporter/prometheus"
logging "github.com/ipfs/go-log/v2"
promclient "github.com/prometheus/client_golang/prometheus"
)
var log = logging.Logger("metrics")
func Exporter() http.Handler {
// Prometheus globals are exposed as interfaces, but the prometheus
// OpenCensus exporter expects a concrete *Registry. The concrete type of
// the globals are actually *Registry, so we downcast them, staying
// defensive in case things change under the hood.
registry, ok := promclient.DefaultRegisterer.(*promclient.Registry)
if !ok {
log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", promclient.DefaultRegisterer)
}
exporter, err := prometheus.NewExporter(prometheus.Options{
Registry: registry,
Namespace: "lotus",
})
if err != nil {
log.Errorf("could not create the prometheus stats exporter: %v", err)
}
return exporter
}

View File

@ -15,14 +15,24 @@ import (
// Distribution
var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 3000, 4000, 5000, 7500, 10000, 20000, 50000, 100000)
var workMillisecondsDistribution = view.Distribution(
250, 500, 1000, 2000, 5000, 10_000, 30_000, 60_000, 2*60_000, 5*60_000, 10*60_000, 15*60_000, 30*60_000, // short sealing tasks
40*60_000, 45*60_000, 50*60_000, 55*60_000, 60*60_000, 65*60_000, 70*60_000, 75*60_000, 80*60_000, 85*60_000, 100*60_000, 120*60_000, // PC2 / C2 range
130*60_000, 140*60_000, 150*60_000, 160*60_000, 180*60_000, 200*60_000, 220*60_000, 260*60_000, 300*60_000, // PC1 range
350*60_000, 400*60_000, 600*60_000, 800*60_000, 1000*60_000, 1300*60_000, 1800*60_000, 4000*60_000, 10000*60_000, // intel PC1 range
)
// Global Tags
var (
Version, _ = tag.NewKey("version")
Commit, _ = tag.NewKey("commit")
PeerID, _ = tag.NewKey("peer_id")
MinerID, _ = tag.NewKey("miner_id")
FailureType, _ = tag.NewKey("failure_type")
// common
Version, _ = tag.NewKey("version")
Commit, _ = tag.NewKey("commit")
NodeType, _ = tag.NewKey("node_type")
PeerID, _ = tag.NewKey("peer_id")
MinerID, _ = tag.NewKey("miner_id")
FailureType, _ = tag.NewKey("failure_type")
// chain
Local, _ = tag.NewKey("local")
MessageFrom, _ = tag.NewKey("message_from")
MessageTo, _ = tag.NewKey("message_to")
@ -30,11 +40,20 @@ var (
ReceivedFrom, _ = tag.NewKey("received_from")
Endpoint, _ = tag.NewKey("endpoint")
APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls
// miner
TaskType, _ = tag.NewKey("task_type")
WorkerHostname, _ = tag.NewKey("worker_hostname")
)
// Measures
var (
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
// common
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds)
// chain
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless)
ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless)
@ -48,7 +67,6 @@ var (
BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless)
BlockValidationDurationMilliseconds = stats.Float64("block/validation_ms", "Duration for Block Validation in ms", stats.UnitMilliseconds)
BlockDelay = stats.Int64("block/delay", "Delay of accepted blocks, where delay is >5s", stats.UnitMilliseconds)
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
PubsubPublishMessage = stats.Int64("pubsub/published", "Counter for total published messages", stats.UnitDimensionless)
PubsubDeliverMessage = stats.Int64("pubsub/delivered", "Counter for total delivered messages", stats.UnitDimensionless)
PubsubRejectMessage = stats.Int64("pubsub/rejected", "Counter for total rejected messages", stats.UnitDimensionless)
@ -56,9 +74,14 @@ var (
PubsubRecvRPC = stats.Int64("pubsub/recv_rpc", "Counter for total received RPCs", stats.UnitDimensionless)
PubsubSendRPC = stats.Int64("pubsub/send_rpc", "Counter for total sent RPCs", stats.UnitDimensionless)
PubsubDropRPC = stats.Int64("pubsub/drop_rpc", "Counter for total dropped RPCs", stats.UnitDimensionless)
APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds)
VMFlushCopyDuration = stats.Float64("vm/flush_copy_ms", "Time spent in VM Flush Copy", stats.UnitMilliseconds)
VMFlushCopyCount = stats.Int64("vm/flush_copy_count", "Number of copied objects", stats.UnitDimensionless)
// miner
WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless)
WorkerCallsReturnedCount = stats.Int64("sealing/worker_calls_returned_count", "Counter of returned worker tasks", stats.UnitDimensionless)
WorkerCallsReturnedDuration = stats.Float64("sealing/worker_calls_returned_ms", "Counter of returned worker tasks", stats.UnitMilliseconds)
WorkerUntrackedCallsReturned = stats.Int64("sealing/worker_untracked_calls_returned", "Counter of returned untracked worker tasks", stats.UnitDimensionless)
)
var (
@ -178,41 +201,72 @@ var (
Measure: VMFlushCopyCount,
Aggregation: view.Sum(),
}
// miner
WorkerCallsStartedView = &view.View{
Measure: WorkerCallsStarted,
Aggregation: view.Count(),
TagKeys: []tag.Key{TaskType, WorkerHostname},
}
WorkerCallsReturnedCountView = &view.View{
Measure: WorkerCallsReturnedCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{TaskType, WorkerHostname},
}
WorkerUntrackedCallsReturnedView = &view.View{
Measure: WorkerUntrackedCallsReturned,
Aggregation: view.Count(),
}
WorkerCallsReturnedDurationView = &view.View{
Measure: WorkerCallsReturnedDuration,
Aggregation: workMillisecondsDistribution,
TagKeys: []tag.Key{TaskType, WorkerHostname},
}
)
// DefaultViews is an array of OpenCensus views for metric gathering purposes
var DefaultViews = func() []*view.View {
views := []*view.View{
InfoView,
ChainNodeHeightView,
ChainNodeHeightExpectedView,
ChainNodeWorkerHeightView,
BlockReceivedView,
BlockValidationFailureView,
BlockValidationSuccessView,
BlockValidationDurationView,
BlockDelayView,
MessagePublishedView,
MessageReceivedView,
MessageValidationFailureView,
MessageValidationSuccessView,
PeerCountView,
PubsubPublishMessageView,
PubsubDeliverMessageView,
PubsubRejectMessageView,
PubsubDuplicateMessageView,
PubsubRecvRPCView,
PubsubSendRPCView,
PubsubDropRPCView,
APIRequestDurationView,
VMFlushCopyCountView,
VMFlushCopyDurationView,
}
views = append(views, blockstore.DefaultViews...)
views = append(views, rpcmetrics.DefaultViews...)
return views
}()
var ChainNodeViews = append([]*view.View{
ChainNodeHeightView,
ChainNodeHeightExpectedView,
ChainNodeWorkerHeightView,
BlockReceivedView,
BlockValidationFailureView,
BlockValidationSuccessView,
BlockValidationDurationView,
BlockDelayView,
MessagePublishedView,
MessageReceivedView,
MessageValidationFailureView,
MessageValidationSuccessView,
PubsubPublishMessageView,
PubsubDeliverMessageView,
PubsubRejectMessageView,
PubsubDuplicateMessageView,
PubsubRecvRPCView,
PubsubSendRPCView,
PubsubDropRPCView,
VMFlushCopyCountView,
VMFlushCopyDurationView,
}, DefaultViews...)
var MinerNodeViews = append([]*view.View{
WorkerCallsStartedView,
WorkerCallsReturnedCountView,
WorkerUntrackedCallsReturnedView,
WorkerCallsReturnedDurationView,
}, DefaultViews...)
// SinceInMilliseconds returns the duration of time since the provide time as a float64.
func SinceInMilliseconds(startTime time.Time) float64 {
return float64(time.Since(startTime).Nanoseconds()) / 1e6