From 353c5d8b123fce0e1c46068a4f741856810b46d0 Mon Sep 17 00:00:00 2001 From: Nate Walck Date: Tue, 25 Feb 2020 21:42:34 -0500 Subject: [PATCH 1/5] Relocation opencensus metrics to its own package and add more node stats --- chain/sync.go | 7 +++++ cmd/lotus/daemon.go | 23 ++++----------- lib/jsonrpc/handler.go | 8 ++++++ metrics/metrics.go | 64 ++++++++++++++++++++++++++++++++++++++++++ peermgr/peermgr.go | 3 ++ 5 files changed, 88 insertions(+), 17 deletions(-) create mode 100644 metrics/metrics.go diff --git a/chain/sync.go b/chain/sync.go index f5f915183..d04feab9c 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -23,6 +23,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" cbg "github.com/whyrusleeping/cbor-gen" "github.com/whyrusleeping/pubsub" + "go.opencensus.io/stats" "go.opencensus.io/trace" "golang.org/x/xerrors" @@ -38,6 +39,7 @@ import ( "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/sigs" + "github.com/filecoin-project/lotus/metrics" ) var log = logging.Logger("chain") @@ -394,6 +396,9 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { ) } + // Record current network chain height when sync is called + stats.Record(ctx, metrics.ChainHeight.M(int64(maybeHead.Height()))) + if syncer.store.GetHeaviestTipSet().ParentWeight().GreaterThan(maybeHead.ParentWeight()) { return nil } @@ -1038,6 +1043,8 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []* return xerrors.Errorf("message processing failed: %w", err) } + // Set current node sync height + stats.Record(ctx, metrics.ChainNodeHeight.M(int64(fts.TipSet().Height()))) ss.SetHeight(fts.TipSet().Height()) return nil diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 2d2759cf4..ad3a3b823 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -24,6 +24,7 @@ import ( "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/testing" @@ -36,12 +37,6 @@ const ( preSealedSectorsFlag = "genesis-presealed-sectors" ) -var ( - lotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) - version, _ = tag.NewKey("version") - commit, _ = tag.NewKey("commit") -) - // DaemonCmd is the `go-lotus daemon` command var DaemonCmd = &cli.Command{ Name: "daemon", @@ -99,7 +94,7 @@ var DaemonCmd = &cli.Command{ defer pprof.StopCPUProfile() } - ctx, _ := tag.New(context.Background(), tag.Insert(version, build.BuildVersion), tag.Insert(commit, build.CurrentCommit)) + ctx, _ := tag.New(context.Background(), tag.Insert(metrics.Version, build.BuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit)) { dir, err := homedir.Expand(cctx.String("repo")) if err != nil { @@ -180,21 +175,15 @@ var DaemonCmd = &cli.Command{ return xerrors.Errorf("initializing node: %w", err) } - // We are using this metric to tag info about lotus even though - // it doesn't contain any actual metrics + // Register all metric views if err = view.Register( - &view.View{ - Name: "info", - Description: "Lotus node information", - Measure: lotusInfo, - Aggregation: view.LastValue(), - TagKeys: []tag.Key{version, commit}, - }, + metrics.DefaultViews..., ); err != nil { log.Fatalf("Cannot register the view: %v", err) } + // Set the metric to one so it is published to the exporter - stats.Record(ctx, lotusInfo.M(1)) + stats.Record(ctx, metrics.LotusInfo.M(1)) endpoint, err := r.APIEndpoint() if err != nil { diff --git a/lib/jsonrpc/handler.go b/lib/jsonrpc/handler.go index 0edc25fa9..b42b9d925 100644 --- a/lib/jsonrpc/handler.go +++ b/lib/jsonrpc/handler.go @@ -9,6 +9,9 @@ import ( "io" "reflect" + "github.com/filecoin-project/lotus/metrics" + "go.opencensus.io/stats" + "go.opencensus.io/tag" "go.opencensus.io/trace" "go.opencensus.io/trace/propagation" "golang.org/x/xerrors" @@ -151,12 +154,15 @@ func (handlers) getSpan(ctx context.Context, req request) (context.Context, *tra } func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) { + // Not sure if we need to sanitize the incoming req.Method or not. ctx, span := h.getSpan(ctx, req) + ctx, _ = tag.New(context.Background(), tag.Insert(metrics.RPCMethod, req.Method)) defer span.End() handler, ok := h[req.Method] if !ok { rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method)) + stats.Record(ctx, metrics.RPCInvalidMethod.M(1)) done(false) return } @@ -201,6 +207,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer if req.ID == nil { return // notification } + stats.Record(ctx, metrics.RPCRequestSuccess.M(1)) /////////////////// @@ -245,5 +252,6 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer log.Error(err) return } + stats.Record(ctx, metrics.RPCResponseSuccess.M(1)) }) } diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 000000000..5e1b78f93 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,64 @@ +package metrics + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +// Global Tags +var ( + Version, _ = tag.NewKey("version") + Commit, _ = tag.NewKey("commit") + RPCMethod, _ = tag.NewKey("method") +) + +// Measures +var ( + LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) + ChainHeight = stats.Int64("chain/height", "Current Height of the chain", stats.UnitDimensionless) + ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) + PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) + RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless) + RPCRequestSuccess = stats.Int64("rpc/request_success", "Total number of successful requests handled", stats.UnitDimensionless) + RPCResponseSuccess = stats.Int64("rpc/response_success", "Total number of succeessful responses handled", stats.UnitDimensionless) +) + +// DefaultViews is an array of Consensus views for metric gathering purposes +var DefaultViews = []*view.View{ + &view.View{ + Name: "info", + Description: "Lotus node information", + Measure: LotusInfo, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{Version, Commit}, + }, + &view.View{ + Measure: ChainHeight, + Aggregation: view.LastValue(), + }, + &view.View{ + Measure: ChainNodeHeight, + Aggregation: view.LastValue(), + }, + // All RPC related metrics should at the very least tag the RPCMethod + &view.View{ + Measure: RPCInvalidMethod, + Aggregation: view.Count(), + TagKeys: []tag.Key{RPCMethod}, + }, + &view.View{ + Measure: RPCRequestSuccess, + Aggregation: view.Count(), + TagKeys: []tag.Key{RPCMethod}, + }, + &view.View{ + Measure: RPCResponseSuccess, + Aggregation: view.Count(), + TagKeys: []tag.Key{RPCMethod}, + }, + &view.View{ + Measure: PeerCount, + Aggregation: view.LastValue(), + }, +} diff --git a/peermgr/peermgr.go b/peermgr/peermgr.go index e1b41e14d..490f2b229 100644 --- a/peermgr/peermgr.go +++ b/peermgr/peermgr.go @@ -5,7 +5,9 @@ import ( "sync" "time" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/modules/dtypes" + "go.opencensus.io/stats" "go.uber.org/fx" host "github.com/libp2p/go-libp2p-core/host" @@ -115,6 +117,7 @@ func (pmgr *PeerMgr) Run(ctx context.Context) { } else if pcount > pmgr.maxFilPeers { log.Debug("peer count about threshold: %d > %d", pcount, pmgr.maxFilPeers) } + stats.Record(ctx, metrics.PeerCount.M(int64(pmgr.getPeerCount()))) } } } From 7db39115e85fc3d595291f1c220e41b43da0077d Mon Sep 17 00:00:00 2001 From: Nate Walck Date: Thu, 27 Feb 2020 22:44:12 -0500 Subject: [PATCH 2/5] Fixed ctx issue, changed to track failures instead of success --- lib/jsonrpc/handler.go | 11 ++++++++--- metrics/metrics.go | 18 +++++++++--------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/lib/jsonrpc/handler.go b/lib/jsonrpc/handler.go index b42b9d925..88a20ee1f 100644 --- a/lib/jsonrpc/handler.go +++ b/lib/jsonrpc/handler.go @@ -156,7 +156,7 @@ func (handlers) getSpan(ctx context.Context, req request) (context.Context, *tra func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) { // Not sure if we need to sanitize the incoming req.Method or not. ctx, span := h.getSpan(ctx, req) - ctx, _ = tag.New(context.Background(), tag.Insert(metrics.RPCMethod, req.Method)) + ctx, _ = tag.New(ctx, tag.Insert(metrics.RPCMethod, req.Method)) defer span.End() handler, ok := h[req.Method] @@ -169,6 +169,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer if len(req.Params) != handler.nParams { rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count")) + stats.Record(ctx, metrics.RPCRequestError.M(1)) done(false) return } @@ -178,6 +179,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer if chOut == nil && outCh { rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not supported in this mode (no out channel support)", req.Method)) + stats.Record(ctx, metrics.RPCRequestError.M(1)) return } @@ -191,6 +193,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer rp := reflect.New(handler.paramReceivers[i]) if err := json.NewDecoder(bytes.NewReader(req.Params[i].data)).Decode(rp.Interface()); err != nil { rpcError(w, &req, rpcParseError, xerrors.Errorf("unmarshaling params for '%s': %w", handler.handlerFunc, err)) + stats.Record(ctx, metrics.RPCRequestError.M(1)) return } @@ -202,12 +205,12 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer callResult, err := doCall(req.Method, handler.handlerFunc, callParams) if err != nil { rpcError(w, &req, 0, xerrors.Errorf("fatal error calling '%s': %w", req.Method, err)) + stats.Record(ctx, metrics.RPCRequestError.M(1)) return } if req.ID == nil { return // notification } - stats.Record(ctx, metrics.RPCRequestSuccess.M(1)) /////////////////// @@ -220,6 +223,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer err := callResult[handler.errOut].Interface() if err != nil { log.Warnf("error in RPC call to '%s': %+v", req.Method, err) + stats.Record(ctx, metrics.RPCResponseError.M(1)) resp.Error = &respError{ Code: 1, Message: err.(error).Error(), @@ -241,6 +245,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer } log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err) + stats.Record(ctx, metrics.RPCResponseError.M(1)) resp.Error = &respError{ Code: 1, Message: err.(error).Error(), @@ -250,8 +255,8 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer w(func(w io.Writer) { if err := json.NewEncoder(w).Encode(resp); err != nil { log.Error(err) + stats.Record(ctx, metrics.RPCResponseError.M(1)) return } - stats.Record(ctx, metrics.RPCResponseSuccess.M(1)) }) } diff --git a/metrics/metrics.go b/metrics/metrics.go index 5e1b78f93..ca32b81b5 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -15,13 +15,13 @@ var ( // Measures var ( - LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) - ChainHeight = stats.Int64("chain/height", "Current Height of the chain", stats.UnitDimensionless) - ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) - PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) - RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless) - RPCRequestSuccess = stats.Int64("rpc/request_success", "Total number of successful requests handled", stats.UnitDimensionless) - RPCResponseSuccess = stats.Int64("rpc/response_success", "Total number of succeessful responses handled", stats.UnitDimensionless) + LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) + ChainHeight = stats.Int64("chain/height", "Current Height of the chain", stats.UnitDimensionless) + ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) + PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) + RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless) + RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless) + RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless) ) // DefaultViews is an array of Consensus views for metric gathering purposes @@ -48,12 +48,12 @@ var DefaultViews = []*view.View{ TagKeys: []tag.Key{RPCMethod}, }, &view.View{ - Measure: RPCRequestSuccess, + Measure: RPCRequestError, Aggregation: view.Count(), TagKeys: []tag.Key{RPCMethod}, }, &view.View{ - Measure: RPCResponseSuccess, + Measure: RPCResponseError, Aggregation: view.Count(), TagKeys: []tag.Key{RPCMethod}, }, From 33af2409e8206213098f3d3216e4e1b283778174 Mon Sep 17 00:00:00 2001 From: Nate Walck Date: Sun, 1 Mar 2020 19:26:09 -0500 Subject: [PATCH 3/5] Use head notif func for current node height, add pubsub metrics --- chain/store/store.go | 12 ++++++++++- chain/sub/incoming.go | 13 ++++++++++++ chain/sync.go | 6 +----- metrics/metrics.go | 48 ++++++++++++++++++++++++++++--------------- 4 files changed, 57 insertions(+), 22 deletions(-) diff --git a/chain/store/store.go b/chain/store/store.go index 2f024d6b1..f98f1df13 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -13,6 +13,8 @@ import ( "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/metrics" + "go.opencensus.io/stats" "go.opencensus.io/trace" "go.uber.org/multierr" @@ -100,7 +102,15 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls *types.VMSy return nil } - cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf) + hcmetric := func(rev, app []*types.TipSet) error { + ctx := context.Background() + for _, r := range app { + stats.Record(ctx, metrics.ChainNodeHeight.M(int64(r.Height()))) + } + return nil + } + + cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf, hcmetric) return cs } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index b4bd8b3fa..93977c6c6 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -2,6 +2,7 @@ package sub import ( "context" + "fmt" "time" lru "github.com/hashicorp/golang-lru" @@ -10,11 +11,14 @@ import ( connmgr "github.com/libp2p/go-libp2p-core/connmgr" peer "github.com/libp2p/go-libp2p-peer" pubsub "github.com/libp2p/go-libp2p-pubsub" + "go.opencensus.io/stats" + "go.opencensus.io/tag" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/metrics" ) var log = logging.Logger("sub") @@ -162,14 +166,23 @@ func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator { } func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool { + ctx, _ = tag.New(ctx, tag.Insert(metrics.PeerID, pid.String())) m, err := types.DecodeSignedMessage(msg.Message.GetData()) if err != nil { log.Warnf("failed to decode incoming message: %s", err) + stats.Record(ctx, metrics.MessageDecodeFailure.M(1)) return false } if err := mv.mpool.Add(m); err != nil { log.Warnf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err) + ctx, _ = tag.New( + ctx, + tag.Insert(metrics.MessageFrom, m.Message.From.String()), + tag.Insert(metrics.MessageTo, m.Message.To.String()), + tag.Insert(metrics.MessageNonce, fmt.Sprint(m.Message.Nonce)), + ) + stats.Record(ctx, metrics.MessageAddFailure.M(1)) return false } diff --git a/chain/sync.go b/chain/sync.go index d04feab9c..9d875e017 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -396,9 +396,6 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { ) } - // Record current network chain height when sync is called - stats.Record(ctx, metrics.ChainHeight.M(int64(maybeHead.Height()))) - if syncer.store.GetHeaviestTipSet().ParentWeight().GreaterThan(maybeHead.ParentWeight()) { return nil } @@ -1043,8 +1040,7 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []* return xerrors.Errorf("message processing failed: %w", err) } - // Set current node sync height - stats.Record(ctx, metrics.ChainNodeHeight.M(int64(fts.TipSet().Height()))) + stats.Record(ctx, metrics.ChainNodeWorkerHeight.M(int64(fts.TipSet().Height()))) ss.SetHeight(fts.TipSet().Height()) return nil diff --git a/metrics/metrics.go b/metrics/metrics.go index ca32b81b5..e10beda8d 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -8,20 +8,26 @@ import ( // Global Tags var ( - Version, _ = tag.NewKey("version") - Commit, _ = tag.NewKey("commit") - RPCMethod, _ = tag.NewKey("method") + Version, _ = tag.NewKey("version") + Commit, _ = tag.NewKey("commit") + RPCMethod, _ = tag.NewKey("method") + PeerID, _ = tag.NewKey("peer_id") + MessageFrom, _ = tag.NewKey("message_from") + MessageTo, _ = tag.NewKey("message_to") + MessageNonce, _ = tag.NewKey("message_nonce") ) // Measures var ( - LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) - ChainHeight = stats.Int64("chain/height", "Current Height of the chain", stats.UnitDimensionless) - ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) - PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) - RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless) - RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless) - RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless) + LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) + ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) + ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) + MessageAddFailure = stats.Int64("message/add_faliure", "Counter for messages that failed to be added", stats.UnitDimensionless) + MessageDecodeFailure = stats.Int64("message/decode_faliure", "Counter for messages that failed to be decoded", stats.UnitDimensionless) + PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) + RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless) + RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless) + RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless) ) // DefaultViews is an array of Consensus views for metric gathering purposes @@ -34,11 +40,25 @@ var DefaultViews = []*view.View{ TagKeys: []tag.Key{Version, Commit}, }, &view.View{ - Measure: ChainHeight, + Measure: ChainNodeHeight, Aggregation: view.LastValue(), }, &view.View{ - Measure: ChainNodeHeight, + Measure: ChainNodeWorkerHeight, + Aggregation: view.LastValue(), + }, + &view.View{ + Measure: MessageAddFailure, + Aggregation: view.Count(), + TagKeys: []tag.Key{MessageFrom, MessageTo, MessageNonce}, + }, + &view.View{ + Measure: MessageDecodeFailure, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID}, + }, + &view.View{ + Measure: PeerCount, Aggregation: view.LastValue(), }, // All RPC related metrics should at the very least tag the RPCMethod @@ -57,8 +77,4 @@ var DefaultViews = []*view.View{ Aggregation: view.Count(), TagKeys: []tag.Key{RPCMethod}, }, - &view.View{ - Measure: PeerCount, - Aggregation: view.LastValue(), - }, } From 8283bb994a53afed7379c9fd5633fc932a33484f Mon Sep 17 00:00:00 2001 From: Nate Walck Date: Sun, 1 Mar 2020 19:57:16 -0500 Subject: [PATCH 4/5] Add block metrics to incoming pubsub validate funcs --- chain/sub/incoming.go | 20 +++++++++++++++--- metrics/metrics.go | 48 +++++++++++++++++++++++++++++++------------ 2 files changed, 52 insertions(+), 16 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 93977c6c6..03c9feaf0 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -111,15 +111,25 @@ func (bv *BlockValidator) flagPeer(p peer.ID) { } func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool { + stats.Record(ctx, metrics.BlockReceived.M(1)) + ctx, _ = tag.New( + ctx, + tag.Insert(metrics.PeerID, pid.String()), + tag.Insert(metrics.ReceivedFrom, msg.ReceivedFrom.String()), + ) blk, err := types.DecodeBlockMsg(msg.GetData()) if err != nil { log.Error("got invalid block over pubsub: ", err) + ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "invalid")) + stats.Record(ctx, metrics.BlockValidationFailure.M(1)) bv.flagPeer(pid) return false } if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit { log.Warnf("received block with too many messages over pubsub") + ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "too_many_messages")) + stats.Record(ctx, metrics.BlockValidationFailure.M(1)) bv.flagPeer(pid) return false } @@ -131,6 +141,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub } msg.ValidatorData = blk + stats.Record(ctx, metrics.BlockValidationSuccess.M(1)) return true } @@ -166,11 +177,13 @@ func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator { } func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool { + stats.Record(ctx, metrics.MessageReceived.M(1)) ctx, _ = tag.New(ctx, tag.Insert(metrics.PeerID, pid.String())) m, err := types.DecodeSignedMessage(msg.Message.GetData()) if err != nil { log.Warnf("failed to decode incoming message: %s", err) - stats.Record(ctx, metrics.MessageDecodeFailure.M(1)) + ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "decode")) + stats.Record(ctx, metrics.MessageValidationFailure.M(1)) return false } @@ -181,11 +194,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs tag.Insert(metrics.MessageFrom, m.Message.From.String()), tag.Insert(metrics.MessageTo, m.Message.To.String()), tag.Insert(metrics.MessageNonce, fmt.Sprint(m.Message.Nonce)), + tag.Insert(metrics.FailureType, "add"), ) - stats.Record(ctx, metrics.MessageAddFailure.M(1)) + stats.Record(ctx, metrics.MessageValidationFailure.M(1)) return false } - + stats.Record(ctx, metrics.MessageValidationSuccess.M(1)) return true } diff --git a/metrics/metrics.go b/metrics/metrics.go index e10beda8d..82b21637c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -12,22 +12,28 @@ var ( Commit, _ = tag.NewKey("commit") RPCMethod, _ = tag.NewKey("method") PeerID, _ = tag.NewKey("peer_id") + FailureType, _ = tag.NewKey("failure_type") MessageFrom, _ = tag.NewKey("message_from") MessageTo, _ = tag.NewKey("message_to") MessageNonce, _ = tag.NewKey("message_nonce") + ReceivedFrom, _ = tag.NewKey("received_from") ) // Measures var ( - LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) - ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) - ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) - MessageAddFailure = stats.Int64("message/add_faliure", "Counter for messages that failed to be added", stats.UnitDimensionless) - MessageDecodeFailure = stats.Int64("message/decode_faliure", "Counter for messages that failed to be decoded", stats.UnitDimensionless) - PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) - RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless) - RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless) - RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless) + LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) + ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) + ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) + MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless) + MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless) + MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless) + BlockReceived = stats.Int64("block/received", "Counter for total received blocks", stats.UnitDimensionless) + BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless) + BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless) + PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) + RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless) + RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless) + RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless) ) // DefaultViews is an array of Consensus views for metric gathering purposes @@ -48,14 +54,30 @@ var DefaultViews = []*view.View{ Aggregation: view.LastValue(), }, &view.View{ - Measure: MessageAddFailure, + Measure: BlockReceived, Aggregation: view.Count(), - TagKeys: []tag.Key{MessageFrom, MessageTo, MessageNonce}, }, &view.View{ - Measure: MessageDecodeFailure, + Measure: BlockValidationFailure, + Aggregation: view.Count(), + TagKeys: []tag.Key{FailureType, PeerID}, + }, + &view.View{ + Measure: BlockValidationSuccess, + Aggregation: view.Count(), + }, + &view.View{ + Measure: MessageReceived, + Aggregation: view.Count(), + }, + &view.View{ + Measure: MessageValidationFailure, + Aggregation: view.Count(), + TagKeys: []tag.Key{FailureType, MessageFrom, MessageTo, MessageNonce}, + }, + &view.View{ + Measure: MessageValidationSuccess, Aggregation: view.Count(), - TagKeys: []tag.Key{PeerID}, }, &view.View{ Measure: PeerCount, From 65cd13301c86231f8eafcde46fa9af8bb78f8b81 Mon Sep 17 00:00:00 2001 From: Nate Walck Date: Sun, 1 Mar 2020 20:01:52 -0500 Subject: [PATCH 5/5] Add ReceivedFrom to block validation failure metrics --- metrics/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index 82b21637c..9ec9caefd 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -60,7 +60,7 @@ var DefaultViews = []*view.View{ &view.View{ Measure: BlockValidationFailure, Aggregation: view.Count(), - TagKeys: []tag.Key{FailureType, PeerID}, + TagKeys: []tag.Key{FailureType, PeerID, ReceivedFrom}, }, &view.View{ Measure: BlockValidationSuccess,