mpool: Add more metrics
This commit is contained in:
parent
8f426b49ec
commit
534badad2a
@ -34,6 +34,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/vm"
|
"github.com/filecoin-project/lotus/chain/vm"
|
||||||
"github.com/filecoin-project/lotus/journal"
|
"github.com/filecoin-project/lotus/journal"
|
||||||
"github.com/filecoin-project/lotus/lib/sigs"
|
"github.com/filecoin-project/lotus/lib/sigs"
|
||||||
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
|
||||||
"github.com/raulk/clock"
|
"github.com/raulk/clock"
|
||||||
@ -577,7 +578,7 @@ func (mp *MessagePool) addLocal(ctx context.Context, m *types.SignedMessage) err
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusio
|
// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusion
|
||||||
// and whether the message has enough funds to be included in the next 20 blocks.
|
// and whether the message has enough funds to be included in the next 20 blocks.
|
||||||
// If the message is not valid for block inclusion, it returns an error.
|
// If the message is not valid for block inclusion, it returns an error.
|
||||||
// For local messages, if the message can be included in the next 20 blocks, it returns true to
|
// For local messages, if the message can be included in the next 20 blocks, it returns true to
|
||||||
@ -631,6 +632,9 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) {
|
func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) {
|
||||||
|
done := metrics.Timer(ctx, metrics.MpoolPushDuration)
|
||||||
|
defer done()
|
||||||
|
|
||||||
err := mp.checkMessage(m)
|
err := mp.checkMessage(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
@ -697,6 +701,9 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error {
|
func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error {
|
||||||
|
done := metrics.Timer(ctx, metrics.MpoolAddDuration)
|
||||||
|
defer done()
|
||||||
|
|
||||||
err := mp.checkMessage(m)
|
err := mp.checkMessage(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -752,7 +759,7 @@ func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet) error {
|
func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet) error {
|
||||||
balance, err := mp.getStateBalance(m.Message.From, curTs)
|
balance, err := mp.getStateBalance(ctx, m.Message.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure)
|
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure)
|
||||||
}
|
}
|
||||||
@ -785,7 +792,10 @@ func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
|
func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
|
||||||
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
done := metrics.Timer(ctx, metrics.MpoolAddTsDuration)
|
||||||
|
defer done()
|
||||||
|
|
||||||
|
snonce, err := mp.getStateNonce(ctx, m.Message.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
||||||
}
|
}
|
||||||
@ -833,7 +843,7 @@ func (mp *MessagePool) addLoaded(ctx context.Context, m *types.SignedMessage) er
|
|||||||
return xerrors.Errorf("current tipset not loaded")
|
return xerrors.Errorf("current tipset not loaded")
|
||||||
}
|
}
|
||||||
|
|
||||||
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
snonce, err := mp.getStateNonce(ctx, m.Message.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
||||||
}
|
}
|
||||||
@ -885,7 +895,7 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
nonce, err := mp.getStateNonce(m.Message.From, mp.curTs)
|
nonce, err := mp.getStateNonce(ctx, m.Message.From, mp.curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to get initial actor nonce: %w", err)
|
return xerrors.Errorf("failed to get initial actor nonce: %w", err)
|
||||||
}
|
}
|
||||||
@ -939,7 +949,7 @@ func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address) (uint
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address, curTs *types.TipSet) (uint64, error) {
|
func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address, curTs *types.TipSet) (uint64, error) {
|
||||||
stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check
|
stateNonce, err := mp.getStateNonce(ctx, addr, curTs) // sanity check
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@ -963,7 +973,10 @@ func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address,
|
|||||||
return stateNonce, nil
|
return stateNonce, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) (uint64, error) {
|
func (mp *MessagePool) getStateNonce(ctx context.Context, addr address.Address, curTs *types.TipSet) (uint64, error) {
|
||||||
|
done := metrics.Timer(ctx, metrics.MpoolGetNonceDuration)
|
||||||
|
defer done()
|
||||||
|
|
||||||
act, err := mp.api.GetActorAfter(addr, curTs)
|
act, err := mp.api.GetActorAfter(addr, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -972,7 +985,10 @@ func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet)
|
|||||||
return act.Nonce, nil
|
return act.Nonce, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) {
|
func (mp *MessagePool) getStateBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (types.BigInt, error) {
|
||||||
|
done := metrics.Timer(ctx, metrics.MpoolGetBalanceDuration)
|
||||||
|
defer done()
|
||||||
|
|
||||||
act, err := mp.api.GetActorAfter(addr, ts)
|
act, err := mp.api.GetActorAfter(addr, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.EmptyInt, err
|
return types.EmptyInt, err
|
||||||
|
@ -507,6 +507,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
|
|||||||
return mv.validateLocalMessage(ctx, msg)
|
return mv.validateLocalMessage(ctx, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
defer func() {
|
||||||
|
ms := time.Now().Sub(start).Microseconds()
|
||||||
|
stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000))
|
||||||
|
}()
|
||||||
|
|
||||||
stats.Record(ctx, metrics.MessageReceived.M(1))
|
stats.Record(ctx, metrics.MessageReceived.M(1))
|
||||||
m, err := types.DecodeSignedMessage(msg.Message.GetData())
|
m, err := types.DecodeSignedMessage(msg.Message.GetData())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -538,6 +544,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
|
|||||||
return pubsub.ValidationReject
|
return pubsub.ValidationReject
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx, _ = tag.New(
|
||||||
|
ctx,
|
||||||
|
tag.Upsert(metrics.MsgValid, "true"),
|
||||||
|
)
|
||||||
|
|
||||||
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
|
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
|
||||||
return pubsub.ValidationAccept
|
return pubsub.ValidationAccept
|
||||||
}
|
}
|
||||||
@ -547,6 +559,13 @@ func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsu
|
|||||||
ctx,
|
ctx,
|
||||||
tag.Upsert(metrics.Local, "true"),
|
tag.Upsert(metrics.Local, "true"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
defer func() {
|
||||||
|
ms := time.Now().Sub(start).Microseconds()
|
||||||
|
stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000))
|
||||||
|
}()
|
||||||
|
|
||||||
// do some lightweight validation
|
// do some lightweight validation
|
||||||
stats.Record(ctx, metrics.MessagePublished.M(1))
|
stats.Record(ctx, metrics.MessagePublished.M(1))
|
||||||
|
|
||||||
@ -581,6 +600,11 @@ func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsu
|
|||||||
return pubsub.ValidationIgnore
|
return pubsub.ValidationIgnore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx, _ = tag.New(
|
||||||
|
ctx,
|
||||||
|
tag.Upsert(metrics.MsgValid, "true"),
|
||||||
|
)
|
||||||
|
|
||||||
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
|
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
|
||||||
return pubsub.ValidationAccept
|
return pubsub.ValidationAccept
|
||||||
}
|
}
|
||||||
|
@ -38,6 +38,7 @@ var (
|
|||||||
MessageTo, _ = tag.NewKey("message_to")
|
MessageTo, _ = tag.NewKey("message_to")
|
||||||
MessageNonce, _ = tag.NewKey("message_nonce")
|
MessageNonce, _ = tag.NewKey("message_nonce")
|
||||||
ReceivedFrom, _ = tag.NewKey("received_from")
|
ReceivedFrom, _ = tag.NewKey("received_from")
|
||||||
|
MsgValid, _ = tag.NewKey("message_valid")
|
||||||
Endpoint, _ = tag.NewKey("endpoint")
|
Endpoint, _ = tag.NewKey("endpoint")
|
||||||
APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls
|
APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls
|
||||||
|
|
||||||
@ -61,6 +62,12 @@ var (
|
|||||||
MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
|
MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
|
||||||
MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
|
MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
|
||||||
MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless)
|
MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless)
|
||||||
|
MessageValidationDuration = stats.Float64("message/validation_ms", "Duration of message validation", stats.UnitMilliseconds)
|
||||||
|
MpoolGetNonceDuration = stats.Float64("mpool/getnonce_ms", "Duration of getStateNonce in mpool", stats.UnitMilliseconds)
|
||||||
|
MpoolGetBalanceDuration = stats.Float64("mpool/getbalance_ms", "Duration of getStateBalance in mpool", stats.UnitMilliseconds)
|
||||||
|
MpoolAddTsDuration = stats.Float64("mpool/addts_ms", "Duration of addTs in mpool", stats.UnitMilliseconds)
|
||||||
|
MpoolAddDuration = stats.Float64("mpool/add_ms", "Duration of Add in mpool", stats.UnitMilliseconds)
|
||||||
|
MpoolPushDuration = stats.Float64("mpool/push_ms", "Duration of Push in mpool", stats.UnitMilliseconds)
|
||||||
BlockPublished = stats.Int64("block/published", "Counter for total locally published blocks", stats.UnitDimensionless)
|
BlockPublished = stats.Int64("block/published", "Counter for total locally published blocks", stats.UnitDimensionless)
|
||||||
BlockReceived = stats.Int64("block/received", "Counter for total received blocks", stats.UnitDimensionless)
|
BlockReceived = stats.Int64("block/received", "Counter for total received blocks", stats.UnitDimensionless)
|
||||||
BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless)
|
BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless)
|
||||||
@ -163,6 +170,31 @@ var (
|
|||||||
Measure: MessageValidationSuccess,
|
Measure: MessageValidationSuccess,
|
||||||
Aggregation: view.Count(),
|
Aggregation: view.Count(),
|
||||||
}
|
}
|
||||||
|
MessageValidationDurationView = &view.View{
|
||||||
|
Measure: MessageValidationDuration,
|
||||||
|
Aggregation: defaultMillisecondsDistribution,
|
||||||
|
TagKeys: []tag.Key{MsgValid, Local},
|
||||||
|
}
|
||||||
|
MpoolGetNonceDurationView = &view.View{
|
||||||
|
Measure: MpoolGetNonceDuration,
|
||||||
|
Aggregation: defaultMillisecondsDistribution,
|
||||||
|
}
|
||||||
|
MpoolGetBalanceDurationView = &view.View{
|
||||||
|
Measure: MpoolGetBalanceDuration,
|
||||||
|
Aggregation: defaultMillisecondsDistribution,
|
||||||
|
}
|
||||||
|
MpoolAddTsDurationView = &view.View{
|
||||||
|
Measure: MpoolAddTsDuration,
|
||||||
|
Aggregation: defaultMillisecondsDistribution,
|
||||||
|
}
|
||||||
|
MpoolAddDurationView = &view.View{
|
||||||
|
Measure: MpoolAddDuration,
|
||||||
|
Aggregation: defaultMillisecondsDistribution,
|
||||||
|
}
|
||||||
|
MpoolPushDurationView = &view.View{
|
||||||
|
Measure: MpoolPushDuration,
|
||||||
|
Aggregation: defaultMillisecondsDistribution,
|
||||||
|
}
|
||||||
PeerCountView = &view.View{
|
PeerCountView = &view.View{
|
||||||
Measure: PeerCount,
|
Measure: PeerCount,
|
||||||
Aggregation: view.LastValue(),
|
Aggregation: view.LastValue(),
|
||||||
@ -278,6 +310,12 @@ var ChainNodeViews = append([]*view.View{
|
|||||||
MessageReceivedView,
|
MessageReceivedView,
|
||||||
MessageValidationFailureView,
|
MessageValidationFailureView,
|
||||||
MessageValidationSuccessView,
|
MessageValidationSuccessView,
|
||||||
|
MessageValidationDurationView,
|
||||||
|
MpoolGetNonceDurationView,
|
||||||
|
MpoolGetBalanceDurationView,
|
||||||
|
MpoolAddTsDurationView,
|
||||||
|
MpoolAddDurationView,
|
||||||
|
MpoolPushDurationView,
|
||||||
PubsubPublishMessageView,
|
PubsubPublishMessageView,
|
||||||
PubsubDeliverMessageView,
|
PubsubDeliverMessageView,
|
||||||
PubsubRejectMessageView,
|
PubsubRejectMessageView,
|
||||||
|
Loading…
Reference in New Issue
Block a user