From 534badad2a75d5e5c2bac84efc74f753bf883c8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 11 Jun 2021 13:19:26 +0200 Subject: [PATCH] mpool: Add more metrics --- chain/messagepool/messagepool.go | 32 ++++++++++++++++++++------- chain/sub/incoming.go | 24 ++++++++++++++++++++ metrics/metrics.go | 38 ++++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 8 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 68390885c..59f7b0f75 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -34,6 +34,7 @@ import ( "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/sigs" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/raulk/clock" @@ -577,7 +578,7 @@ func (mp *MessagePool) addLocal(ctx context.Context, m *types.SignedMessage) err return nil } -// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusio +// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusion // and whether the message has enough funds to be included in the next 20 blocks. // If the message is not valid for block inclusion, it returns an error. // For local messages, if the message can be included in the next 20 blocks, it returns true to @@ -631,6 +632,9 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T } func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) { + done := metrics.Timer(ctx, metrics.MpoolPushDuration) + defer done() + err := mp.checkMessage(m) if err != nil { return cid.Undef, err @@ -697,6 +701,9 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error { } func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error { + done := metrics.Timer(ctx, metrics.MpoolAddDuration) + defer done() + err := mp.checkMessage(m) if err != nil { return err @@ -752,7 +759,7 @@ func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error { } func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet) error { - balance, err := mp.getStateBalance(m.Message.From, curTs) + balance, err := mp.getStateBalance(ctx, m.Message.From, curTs) if err != nil { return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure) } @@ -785,7 +792,10 @@ func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage, } func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) { - snonce, err := mp.getStateNonce(m.Message.From, curTs) + done := metrics.Timer(ctx, metrics.MpoolAddTsDuration) + defer done() + + snonce, err := mp.getStateNonce(ctx, m.Message.From, curTs) if err != nil { return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure) } @@ -833,7 +843,7 @@ func (mp *MessagePool) addLoaded(ctx context.Context, m *types.SignedMessage) er return xerrors.Errorf("current tipset not loaded") } - snonce, err := mp.getStateNonce(m.Message.From, curTs) + snonce, err := mp.getStateNonce(ctx, m.Message.From, curTs) if err != nil { return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure) } @@ -885,7 +895,7 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st } if !ok { - nonce, err := mp.getStateNonce(m.Message.From, mp.curTs) + nonce, err := mp.getStateNonce(ctx, m.Message.From, mp.curTs) if err != nil { return xerrors.Errorf("failed to get initial actor nonce: %w", err) } @@ -939,7 +949,7 @@ func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address) (uint } func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address, curTs *types.TipSet) (uint64, error) { - stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check + stateNonce, err := mp.getStateNonce(ctx, addr, curTs) // sanity check if err != nil { return 0, err } @@ -963,7 +973,10 @@ func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address, return stateNonce, nil } -func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) (uint64, error) { +func (mp *MessagePool) getStateNonce(ctx context.Context, addr address.Address, curTs *types.TipSet) (uint64, error) { + done := metrics.Timer(ctx, metrics.MpoolGetNonceDuration) + defer done() + act, err := mp.api.GetActorAfter(addr, curTs) if err != nil { return 0, err @@ -972,7 +985,10 @@ func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) return act.Nonce, nil } -func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) { +func (mp *MessagePool) getStateBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (types.BigInt, error) { + done := metrics.Timer(ctx, metrics.MpoolGetBalanceDuration) + defer done() + act, err := mp.api.GetActorAfter(addr, ts) if err != nil { return types.EmptyInt, err diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 65447bc11..7452d31a9 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -507,6 +507,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs return mv.validateLocalMessage(ctx, msg) } + start := time.Now() + defer func() { + ms := time.Now().Sub(start).Microseconds() + stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000)) + }() + stats.Record(ctx, metrics.MessageReceived.M(1)) m, err := types.DecodeSignedMessage(msg.Message.GetData()) if err != nil { @@ -538,6 +544,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs return pubsub.ValidationReject } } + + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.MsgValid, "true"), + ) + stats.Record(ctx, metrics.MessageValidationSuccess.M(1)) return pubsub.ValidationAccept } @@ -547,6 +559,13 @@ func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsu ctx, tag.Upsert(metrics.Local, "true"), ) + + start := time.Now() + defer func() { + ms := time.Now().Sub(start).Microseconds() + stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000)) + }() + // do some lightweight validation stats.Record(ctx, metrics.MessagePublished.M(1)) @@ -581,6 +600,11 @@ func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsu return pubsub.ValidationIgnore } + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.MsgValid, "true"), + ) + stats.Record(ctx, metrics.MessageValidationSuccess.M(1)) return pubsub.ValidationAccept } diff --git a/metrics/metrics.go b/metrics/metrics.go index 5428a81bc..08c20e634 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -38,6 +38,7 @@ var ( MessageTo, _ = tag.NewKey("message_to") MessageNonce, _ = tag.NewKey("message_nonce") ReceivedFrom, _ = tag.NewKey("received_from") + MsgValid, _ = tag.NewKey("message_valid") Endpoint, _ = tag.NewKey("endpoint") APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls @@ -61,6 +62,12 @@ var ( MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless) MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless) MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless) + MessageValidationDuration = stats.Float64("message/validation_ms", "Duration of message validation", stats.UnitMilliseconds) + MpoolGetNonceDuration = stats.Float64("mpool/getnonce_ms", "Duration of getStateNonce in mpool", stats.UnitMilliseconds) + MpoolGetBalanceDuration = stats.Float64("mpool/getbalance_ms", "Duration of getStateBalance in mpool", stats.UnitMilliseconds) + MpoolAddTsDuration = stats.Float64("mpool/addts_ms", "Duration of addTs in mpool", stats.UnitMilliseconds) + MpoolAddDuration = stats.Float64("mpool/add_ms", "Duration of Add in mpool", stats.UnitMilliseconds) + MpoolPushDuration = stats.Float64("mpool/push_ms", "Duration of Push in mpool", stats.UnitMilliseconds) BlockPublished = stats.Int64("block/published", "Counter for total locally published blocks", stats.UnitDimensionless) BlockReceived = stats.Int64("block/received", "Counter for total received blocks", stats.UnitDimensionless) BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless) @@ -163,6 +170,31 @@ var ( Measure: MessageValidationSuccess, Aggregation: view.Count(), } + MessageValidationDurationView = &view.View{ + Measure: MessageValidationDuration, + Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{MsgValid, Local}, + } + MpoolGetNonceDurationView = &view.View{ + Measure: MpoolGetNonceDuration, + Aggregation: defaultMillisecondsDistribution, + } + MpoolGetBalanceDurationView = &view.View{ + Measure: MpoolGetBalanceDuration, + Aggregation: defaultMillisecondsDistribution, + } + MpoolAddTsDurationView = &view.View{ + Measure: MpoolAddTsDuration, + Aggregation: defaultMillisecondsDistribution, + } + MpoolAddDurationView = &view.View{ + Measure: MpoolAddDuration, + Aggregation: defaultMillisecondsDistribution, + } + MpoolPushDurationView = &view.View{ + Measure: MpoolPushDuration, + Aggregation: defaultMillisecondsDistribution, + } PeerCountView = &view.View{ Measure: PeerCount, Aggregation: view.LastValue(), @@ -278,6 +310,12 @@ var ChainNodeViews = append([]*view.View{ MessageReceivedView, MessageValidationFailureView, MessageValidationSuccessView, + MessageValidationDurationView, + MpoolGetNonceDurationView, + MpoolGetBalanceDurationView, + MpoolAddTsDurationView, + MpoolAddDurationView, + MpoolPushDurationView, PubsubPublishMessageView, PubsubDeliverMessageView, PubsubRejectMessageView,