Merge pull request #5648 from filecoin-project/feat/miner-worker-metrics

Collect worker task metrics
This commit is contained in:
Łukasz Magiera 2021-03-05 13:04:46 +01:00 committed by GitHub
commit 046eb284b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 202 additions and 99 deletions

View File

@ -77,7 +77,7 @@ var runCmd = &cli.Command{
// Register all metric views // Register all metric views
if err := view.Register( if err := view.Register(
metrics.DefaultViews..., metrics.ChainNodeViews...,
); err != nil { ); err != nil {
log.Fatalf("Cannot register the view: %v", err) log.Fatalf("Cannot register the view: %v", err)
} }

View File

@ -13,6 +13,7 @@ import (
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net" manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view" "go.opencensus.io/stats/view"
"go.opencensus.io/tag" "go.opencensus.io/tag"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -68,14 +69,20 @@ var runCmd = &cli.Command{
return xerrors.Errorf("getting full node api: %w", err) return xerrors.Errorf("getting full node api: %w", err)
} }
defer ncloser() defer ncloser()
ctx := lcli.DaemonContext(cctx)
ctx, _ := tag.New(lcli.DaemonContext(cctx),
tag.Insert(metrics.Version, build.BuildVersion),
tag.Insert(metrics.Commit, build.CurrentCommit),
tag.Insert(metrics.NodeType, "miner"),
)
// Register all metric views // Register all metric views
if err := view.Register( if err = view.Register(
metrics.DefaultViews..., metrics.MinerNodeViews...,
); err != nil { ); err != nil {
log.Fatalf("Cannot register the view: %v", err) log.Fatalf("Cannot register the view: %v", err)
} }
// Set the metric to one so it is published to the exporter
stats.Record(ctx, metrics.LotusInfo.M(1))
v, err := nodeApi.Version(ctx) v, err := nodeApi.Version(ctx)
if err != nil { if err != nil {
@ -162,6 +169,7 @@ var runCmd = &cli.Command{
mux.Handle("/rpc/v0", rpcServer) mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote) mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote)
mux.Handle("/debug/metrics", metrics.Exporter())
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
ah := &auth.Handler{ ah := &auth.Handler{

View File

@ -192,7 +192,20 @@ var DaemonCmd = &cli.Command{
return fmt.Errorf("unrecognized profile type: %q", profile) return fmt.Errorf("unrecognized profile type: %q", profile)
} }
ctx, _ := tag.New(context.Background(), tag.Insert(metrics.Version, build.BuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit)) ctx, _ := tag.New(context.Background(),
tag.Insert(metrics.Version, build.BuildVersion),
tag.Insert(metrics.Commit, build.CurrentCommit),
tag.Insert(metrics.NodeType, "chain"),
)
// Register all metric views
if err = view.Register(
metrics.ChainNodeViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}
// Set the metric to one so it is published to the exporter
stats.Record(ctx, metrics.LotusInfo.M(1))
{ {
dir, err := homedir.Expand(cctx.String("repo")) dir, err := homedir.Expand(cctx.String("repo"))
if err != nil { if err != nil {
@ -332,16 +345,6 @@ var DaemonCmd = &cli.Command{
} }
} }
// Register all metric views
if err = view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}
// Set the metric to one so it is published to the exporter
stats.Record(ctx, metrics.LotusInfo.M(1))
endpoint, err := r.APIEndpoint() endpoint, err := r.APIEndpoint()
if err != nil { if err != nil {
return xerrors.Errorf("getting api endpoint: %w", err) return xerrors.Errorf("getting api endpoint: %w", err)

View File

@ -15,12 +15,9 @@ import (
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net" manet "github.com/multiformats/go-multiaddr/net"
promclient "github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/tag" "go.opencensus.io/tag"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"contrib.go.opencensus.io/exporter/prometheus"
"github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-jsonrpc/auth"
@ -55,23 +52,7 @@ func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shut
http.Handle("/rest/v0/import", importAH) http.Handle("/rest/v0/import", importAH)
// Prometheus globals are exposed as interfaces, but the prometheus http.Handle("/debug/metrics", metrics.Exporter())
// OpenCensus exporter expects a concrete *Registry. The concrete type of
// the globals are actually *Registry, so we downcast them, staying
// defensive in case things change under the hood.
registry, ok := promclient.DefaultRegisterer.(*promclient.Registry)
if !ok {
log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", promclient.DefaultRegisterer)
}
exporter, err := prometheus.NewExporter(prometheus.Options{
Registry: registry,
Namespace: "lotus",
})
if err != nil {
log.Fatalf("could not create the prometheus stats exporter: %v", err)
}
http.Handle("/debug/metrics", exporter)
http.Handle("/debug/pprof-set/block", handleFractionOpt("BlockProfileRate", runtime.SetBlockProfileRate)) http.Handle("/debug/pprof-set/block", handleFractionOpt("BlockProfileRate", runtime.SetBlockProfileRate))
http.Handle("/debug/pprof-set/mutex", handleFractionOpt("MutexProfileFraction", http.Handle("/debug/pprof-set/mutex", handleFractionOpt("MutexProfileFraction",
func(x int) { runtime.SetMutexProfileFraction(x) }, func(x int) { runtime.SetMutexProfileFraction(x) },

View File

@ -632,47 +632,47 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
} }
func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error { func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return m.returnResult(callID, pi, err) return m.returnResult(ctx, callID, pi, err)
} }
func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error { func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error {
return m.returnResult(callID, p1o, err) return m.returnResult(ctx, callID, p1o, err)
} }
func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error { func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error {
return m.returnResult(callID, sealed, err) return m.returnResult(ctx, callID, sealed, err)
} }
func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error { func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error {
return m.returnResult(callID, out, err) return m.returnResult(ctx, callID, out, err)
} }
func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error { func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error {
return m.returnResult(callID, proof, err) return m.returnResult(ctx, callID, proof, err)
} }
func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err) return m.returnResult(ctx, callID, nil, err)
} }
func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err) return m.returnResult(ctx, callID, nil, err)
} }
func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err) return m.returnResult(ctx, callID, nil, err)
} }
func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err) return m.returnResult(ctx, callID, nil, err)
} }
func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error { func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error {
return m.returnResult(callID, ok, err) return m.returnResult(ctx, callID, ok, err)
} }
func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err) return m.returnResult(ctx, callID, nil, err)
} }
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) { func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {

View File

@ -349,7 +349,7 @@ func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interf
} }
} }
func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *storiface.CallError) error { func (m *Manager) returnResult(ctx context.Context, callID storiface.CallID, r interface{}, cerr *storiface.CallError) error {
res := result{ res := result{
r: r, r: r,
} }
@ -357,7 +357,7 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *sto
res.err = cerr res.err = cerr
} }
m.sched.workTracker.onDone(callID) m.sched.workTracker.onDone(ctx, callID)
m.workLk.Lock() m.workLk.Lock()
defer m.workLk.Unlock() defer m.workLk.Unlock()
@ -413,5 +413,5 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *sto
func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error { func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error {
// TODO: Allow temp error // TODO: Allow temp error
return m.returnResult(call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted"))) return m.returnResult(ctx, call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted")))
} }

View File

@ -398,7 +398,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
go func() { go func() {
// first run the prepare step (e.g. fetching sector data from other worker) // first run the prepare step (e.g. fetching sector data from other worker)
err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.workerRpc)) err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))
sh.workersLk.Lock() sh.workersLk.Lock()
if err != nil { if err != nil {
@ -437,7 +437,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
} }
// Do the work! // Do the work!
err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.workerRpc)) err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))
select { select {
case req.ret <- workerResponse{err: err}: case req.ret <- workerResponse{err: err}:

View File

@ -7,17 +7,21 @@ import (
"time" "time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/metrics"
) )
type trackedWork struct { type trackedWork struct {
job storiface.WorkerJob job storiface.WorkerJob
worker WorkerID worker WorkerID
workerHostname string
} }
type workTracker struct { type workTracker struct {
@ -29,20 +33,31 @@ type workTracker struct {
// TODO: done, aggregate stats, queue stats, scheduler feedback // TODO: done, aggregate stats, queue stats, scheduler feedback
} }
func (wt *workTracker) onDone(callID storiface.CallID) { func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) {
wt.lk.Lock() wt.lk.Lock()
defer wt.lk.Unlock() defer wt.lk.Unlock()
_, ok := wt.running[callID] t, ok := wt.running[callID]
if !ok { if !ok {
wt.done[callID] = struct{}{} wt.done[callID] = struct{}{}
stats.Record(ctx, metrics.WorkerUntrackedCallsReturned.M(1))
return return
} }
took := metrics.SinceInMilliseconds(t.job.Start)
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.TaskType, string(t.job.Task)),
tag.Upsert(metrics.WorkerHostname, t.workerHostname),
)
stats.Record(ctx, metrics.WorkerCallsReturnedCount.M(1), metrics.WorkerCallsReturnedDuration.M(took))
delete(wt.running, callID) delete(wt.running, callID)
} }
func (wt *workTracker) track(wid WorkerID, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { func (wt *workTracker) track(ctx context.Context, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
return func(callID storiface.CallID, err error) (storiface.CallID, error) { return func(callID storiface.CallID, err error) (storiface.CallID, error) {
if err != nil { if err != nil {
return callID, err return callID, err
@ -64,17 +79,26 @@ func (wt *workTracker) track(wid WorkerID, sid storage.SectorRef, task sealtasks
Task: task, Task: task,
Start: time.Now(), Start: time.Now(),
}, },
worker: wid, worker: wid,
workerHostname: wi.Hostname,
} }
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.TaskType, string(task)),
tag.Upsert(metrics.WorkerHostname, wi.Hostname),
)
stats.Record(ctx, metrics.WorkerCallsStarted.M(1))
return callID, err return callID, err
} }
} }
func (wt *workTracker) worker(wid WorkerID, w Worker) Worker { func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) Worker {
return &trackedWorker{ return &trackedWorker{
Worker: w, Worker: w,
wid: wid, wid: wid,
workerInfo: wi,
tracker: wt, tracker: wt,
} }
@ -94,45 +118,46 @@ func (wt *workTracker) Running() []trackedWork {
type trackedWorker struct { type trackedWorker struct {
Worker Worker
wid WorkerID wid WorkerID
workerInfo storiface.WorkerInfo
tracker *workTracker tracker *workTracker
} }
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)) return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
} }
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) { func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o)) return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
} }
func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
} }
func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) { func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o)) return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
} }
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) { func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed)) return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
} }
func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)) return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
} }
func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.tracker.track(t.wid, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am)) return t.tracker.track(ctx, t.wid, t.workerInfo, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
} }
func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
return t.tracker.track(t.wid, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)) return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
} }
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) { func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
return t.tracker.track(t.wid, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size)) return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size))
} }
var _ Worker = &trackedWorker{} var _ Worker = &trackedWorker{}

32
metrics/exporter.go Normal file
View File

@ -0,0 +1,32 @@
package metrics
import (
"net/http"
_ "net/http/pprof"
"contrib.go.opencensus.io/exporter/prometheus"
logging "github.com/ipfs/go-log/v2"
promclient "github.com/prometheus/client_golang/prometheus"
)
var log = logging.Logger("metrics")
func Exporter() http.Handler {
// Prometheus globals are exposed as interfaces, but the prometheus
// OpenCensus exporter expects a concrete *Registry. The concrete type of
// the globals are actually *Registry, so we downcast them, staying
// defensive in case things change under the hood.
registry, ok := promclient.DefaultRegisterer.(*promclient.Registry)
if !ok {
log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", promclient.DefaultRegisterer)
}
exporter, err := prometheus.NewExporter(prometheus.Options{
Registry: registry,
Namespace: "lotus",
})
if err != nil {
log.Errorf("could not create the prometheus stats exporter: %v", err)
}
return exporter
}

View File

@ -15,14 +15,24 @@ import (
// Distribution // Distribution
var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 3000, 4000, 5000, 7500, 10000, 20000, 50000, 100000) var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 3000, 4000, 5000, 7500, 10000, 20000, 50000, 100000)
var workMillisecondsDistribution = view.Distribution(
250, 500, 1000, 2000, 5000, 10_000, 30_000, 60_000, 2*60_000, 5*60_000, 10*60_000, 15*60_000, 30*60_000, // short sealing tasks
40*60_000, 45*60_000, 50*60_000, 55*60_000, 60*60_000, 65*60_000, 70*60_000, 75*60_000, 80*60_000, 85*60_000, 100*60_000, 120*60_000, // PC2 / C2 range
130*60_000, 140*60_000, 150*60_000, 160*60_000, 180*60_000, 200*60_000, 220*60_000, 260*60_000, 300*60_000, // PC1 range
350*60_000, 400*60_000, 600*60_000, 800*60_000, 1000*60_000, 1300*60_000, 1800*60_000, 4000*60_000, 10000*60_000, // intel PC1 range
)
// Global Tags // Global Tags
var ( var (
Version, _ = tag.NewKey("version") // common
Commit, _ = tag.NewKey("commit") Version, _ = tag.NewKey("version")
PeerID, _ = tag.NewKey("peer_id") Commit, _ = tag.NewKey("commit")
MinerID, _ = tag.NewKey("miner_id") NodeType, _ = tag.NewKey("node_type")
FailureType, _ = tag.NewKey("failure_type") PeerID, _ = tag.NewKey("peer_id")
MinerID, _ = tag.NewKey("miner_id")
FailureType, _ = tag.NewKey("failure_type")
// chain
Local, _ = tag.NewKey("local") Local, _ = tag.NewKey("local")
MessageFrom, _ = tag.NewKey("message_from") MessageFrom, _ = tag.NewKey("message_from")
MessageTo, _ = tag.NewKey("message_to") MessageTo, _ = tag.NewKey("message_to")
@ -30,11 +40,20 @@ var (
ReceivedFrom, _ = tag.NewKey("received_from") ReceivedFrom, _ = tag.NewKey("received_from")
Endpoint, _ = tag.NewKey("endpoint") Endpoint, _ = tag.NewKey("endpoint")
APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls
// miner
TaskType, _ = tag.NewKey("task_type")
WorkerHostname, _ = tag.NewKey("worker_hostname")
) )
// Measures // Measures
var ( var (
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) // common
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds)
// chain
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless) ChainNodeHeightExpected = stats.Int64("chain/node_height_expected", "Expected Height of the node", stats.UnitDimensionless)
ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless)
@ -48,7 +67,6 @@ var (
BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless) BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless)
BlockValidationDurationMilliseconds = stats.Float64("block/validation_ms", "Duration for Block Validation in ms", stats.UnitMilliseconds) BlockValidationDurationMilliseconds = stats.Float64("block/validation_ms", "Duration for Block Validation in ms", stats.UnitMilliseconds)
BlockDelay = stats.Int64("block/delay", "Delay of accepted blocks, where delay is >5s", stats.UnitMilliseconds) BlockDelay = stats.Int64("block/delay", "Delay of accepted blocks, where delay is >5s", stats.UnitMilliseconds)
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
PubsubPublishMessage = stats.Int64("pubsub/published", "Counter for total published messages", stats.UnitDimensionless) PubsubPublishMessage = stats.Int64("pubsub/published", "Counter for total published messages", stats.UnitDimensionless)
PubsubDeliverMessage = stats.Int64("pubsub/delivered", "Counter for total delivered messages", stats.UnitDimensionless) PubsubDeliverMessage = stats.Int64("pubsub/delivered", "Counter for total delivered messages", stats.UnitDimensionless)
PubsubRejectMessage = stats.Int64("pubsub/rejected", "Counter for total rejected messages", stats.UnitDimensionless) PubsubRejectMessage = stats.Int64("pubsub/rejected", "Counter for total rejected messages", stats.UnitDimensionless)
@ -56,9 +74,14 @@ var (
PubsubRecvRPC = stats.Int64("pubsub/recv_rpc", "Counter for total received RPCs", stats.UnitDimensionless) PubsubRecvRPC = stats.Int64("pubsub/recv_rpc", "Counter for total received RPCs", stats.UnitDimensionless)
PubsubSendRPC = stats.Int64("pubsub/send_rpc", "Counter for total sent RPCs", stats.UnitDimensionless) PubsubSendRPC = stats.Int64("pubsub/send_rpc", "Counter for total sent RPCs", stats.UnitDimensionless)
PubsubDropRPC = stats.Int64("pubsub/drop_rpc", "Counter for total dropped RPCs", stats.UnitDimensionless) PubsubDropRPC = stats.Int64("pubsub/drop_rpc", "Counter for total dropped RPCs", stats.UnitDimensionless)
APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds)
VMFlushCopyDuration = stats.Float64("vm/flush_copy_ms", "Time spent in VM Flush Copy", stats.UnitMilliseconds) VMFlushCopyDuration = stats.Float64("vm/flush_copy_ms", "Time spent in VM Flush Copy", stats.UnitMilliseconds)
VMFlushCopyCount = stats.Int64("vm/flush_copy_count", "Number of copied objects", stats.UnitDimensionless) VMFlushCopyCount = stats.Int64("vm/flush_copy_count", "Number of copied objects", stats.UnitDimensionless)
// miner
WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless)
WorkerCallsReturnedCount = stats.Int64("sealing/worker_calls_returned_count", "Counter of returned worker tasks", stats.UnitDimensionless)
WorkerCallsReturnedDuration = stats.Float64("sealing/worker_calls_returned_ms", "Counter of returned worker tasks", stats.UnitMilliseconds)
WorkerUntrackedCallsReturned = stats.Int64("sealing/worker_untracked_calls_returned", "Counter of returned untracked worker tasks", stats.UnitDimensionless)
) )
var ( var (
@ -178,41 +201,72 @@ var (
Measure: VMFlushCopyCount, Measure: VMFlushCopyCount,
Aggregation: view.Sum(), Aggregation: view.Sum(),
} }
// miner
WorkerCallsStartedView = &view.View{
Measure: WorkerCallsStarted,
Aggregation: view.Count(),
TagKeys: []tag.Key{TaskType, WorkerHostname},
}
WorkerCallsReturnedCountView = &view.View{
Measure: WorkerCallsReturnedCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{TaskType, WorkerHostname},
}
WorkerUntrackedCallsReturnedView = &view.View{
Measure: WorkerUntrackedCallsReturned,
Aggregation: view.Count(),
}
WorkerCallsReturnedDurationView = &view.View{
Measure: WorkerCallsReturnedDuration,
Aggregation: workMillisecondsDistribution,
TagKeys: []tag.Key{TaskType, WorkerHostname},
}
) )
// DefaultViews is an array of OpenCensus views for metric gathering purposes // DefaultViews is an array of OpenCensus views for metric gathering purposes
var DefaultViews = func() []*view.View { var DefaultViews = func() []*view.View {
views := []*view.View{ views := []*view.View{
InfoView, InfoView,
ChainNodeHeightView,
ChainNodeHeightExpectedView,
ChainNodeWorkerHeightView,
BlockReceivedView,
BlockValidationFailureView,
BlockValidationSuccessView,
BlockValidationDurationView,
BlockDelayView,
MessagePublishedView,
MessageReceivedView,
MessageValidationFailureView,
MessageValidationSuccessView,
PeerCountView, PeerCountView,
PubsubPublishMessageView,
PubsubDeliverMessageView,
PubsubRejectMessageView,
PubsubDuplicateMessageView,
PubsubRecvRPCView,
PubsubSendRPCView,
PubsubDropRPCView,
APIRequestDurationView, APIRequestDurationView,
VMFlushCopyCountView,
VMFlushCopyDurationView,
} }
views = append(views, blockstore.DefaultViews...) views = append(views, blockstore.DefaultViews...)
views = append(views, rpcmetrics.DefaultViews...) views = append(views, rpcmetrics.DefaultViews...)
return views return views
}() }()
var ChainNodeViews = append([]*view.View{
ChainNodeHeightView,
ChainNodeHeightExpectedView,
ChainNodeWorkerHeightView,
BlockReceivedView,
BlockValidationFailureView,
BlockValidationSuccessView,
BlockValidationDurationView,
BlockDelayView,
MessagePublishedView,
MessageReceivedView,
MessageValidationFailureView,
MessageValidationSuccessView,
PubsubPublishMessageView,
PubsubDeliverMessageView,
PubsubRejectMessageView,
PubsubDuplicateMessageView,
PubsubRecvRPCView,
PubsubSendRPCView,
PubsubDropRPCView,
VMFlushCopyCountView,
VMFlushCopyDurationView,
}, DefaultViews...)
var MinerNodeViews = append([]*view.View{
WorkerCallsStartedView,
WorkerCallsReturnedCountView,
WorkerUntrackedCallsReturnedView,
WorkerCallsReturnedDurationView,
}, DefaultViews...)
// SinceInMilliseconds returns the duration of time since the provide time as a float64. // SinceInMilliseconds returns the duration of time since the provide time as a float64.
func SinceInMilliseconds(startTime time.Time) float64 { func SinceInMilliseconds(startTime time.Time) float64 {
return float64(time.Since(startTime).Nanoseconds()) / 1e6 return float64(time.Since(startTime).Nanoseconds()) / 1e6