Use head notif func for current node height, add pubsub metrics

This commit is contained in:
Nate Walck 2020-03-01 19:26:09 -05:00
parent 7db39115e8
commit 33af2409e8
4 changed files with 57 additions and 22 deletions

View File

@ -13,6 +13,8 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/metrics"
"go.opencensus.io/stats"
"go.opencensus.io/trace"
"go.uber.org/multierr"
@ -100,7 +102,15 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls *types.VMSy
return nil
}
cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf)
hcmetric := func(rev, app []*types.TipSet) error {
ctx := context.Background()
for _, r := range app {
stats.Record(ctx, metrics.ChainNodeHeight.M(int64(r.Height())))
}
return nil
}
cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf, hcmetric)
return cs
}

View File

@ -2,6 +2,7 @@ package sub
import (
"context"
"fmt"
"time"
lru "github.com/hashicorp/golang-lru"
@ -10,11 +11,14 @@ import (
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
peer "github.com/libp2p/go-libp2p-peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/metrics"
)
var log = logging.Logger("sub")
@ -162,14 +166,23 @@ func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator {
}
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool {
ctx, _ = tag.New(ctx, tag.Insert(metrics.PeerID, pid.String()))
m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil {
log.Warnf("failed to decode incoming message: %s", err)
stats.Record(ctx, metrics.MessageDecodeFailure.M(1))
return false
}
if err := mv.mpool.Add(m); err != nil {
log.Warnf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
ctx, _ = tag.New(
ctx,
tag.Insert(metrics.MessageFrom, m.Message.From.String()),
tag.Insert(metrics.MessageTo, m.Message.To.String()),
tag.Insert(metrics.MessageNonce, fmt.Sprint(m.Message.Nonce)),
)
stats.Record(ctx, metrics.MessageAddFailure.M(1))
return false
}

View File

@ -396,9 +396,6 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
)
}
// Record current network chain height when sync is called
stats.Record(ctx, metrics.ChainHeight.M(int64(maybeHead.Height())))
if syncer.store.GetHeaviestTipSet().ParentWeight().GreaterThan(maybeHead.ParentWeight()) {
return nil
}
@ -1043,8 +1040,7 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*
return xerrors.Errorf("message processing failed: %w", err)
}
// Set current node sync height
stats.Record(ctx, metrics.ChainNodeHeight.M(int64(fts.TipSet().Height())))
stats.Record(ctx, metrics.ChainNodeWorkerHeight.M(int64(fts.TipSet().Height())))
ss.SetHeight(fts.TipSet().Height())
return nil

View File

@ -11,13 +11,19 @@ var (
Version, _ = tag.NewKey("version")
Commit, _ = tag.NewKey("commit")
RPCMethod, _ = tag.NewKey("method")
PeerID, _ = tag.NewKey("peer_id")
MessageFrom, _ = tag.NewKey("message_from")
MessageTo, _ = tag.NewKey("message_to")
MessageNonce, _ = tag.NewKey("message_nonce")
)
// Measures
var (
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
ChainHeight = stats.Int64("chain/height", "Current Height of the chain", stats.UnitDimensionless)
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless)
MessageAddFailure = stats.Int64("message/add_faliure", "Counter for messages that failed to be added", stats.UnitDimensionless)
MessageDecodeFailure = stats.Int64("message/decode_faliure", "Counter for messages that failed to be decoded", stats.UnitDimensionless)
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless)
RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless)
@ -34,11 +40,25 @@ var DefaultViews = []*view.View{
TagKeys: []tag.Key{Version, Commit},
},
&view.View{
Measure: ChainHeight,
Measure: ChainNodeHeight,
Aggregation: view.LastValue(),
},
&view.View{
Measure: ChainNodeHeight,
Measure: ChainNodeWorkerHeight,
Aggregation: view.LastValue(),
},
&view.View{
Measure: MessageAddFailure,
Aggregation: view.Count(),
TagKeys: []tag.Key{MessageFrom, MessageTo, MessageNonce},
},
&view.View{
Measure: MessageDecodeFailure,
Aggregation: view.Count(),
TagKeys: []tag.Key{PeerID},
},
&view.View{
Measure: PeerCount,
Aggregation: view.LastValue(),
},
// All RPC related metrics should at the very least tag the RPCMethod
@ -57,8 +77,4 @@ var DefaultViews = []*view.View{
Aggregation: view.Count(),
TagKeys: []tag.Key{RPCMethod},
},
&view.View{
Measure: PeerCount,
Aggregation: view.LastValue(),
},
}