Merge pull request #7398 from ChainSafe/libp2p-pubsub-tracer
Lotus extended pubsub tracer
This commit is contained in:
commit
a10c014ba6
@ -98,6 +98,35 @@
|
||||
# env var: LOTUS_PUBSUB_REMOTETRACER
|
||||
#RemoteTracer = ""
|
||||
|
||||
# Path to file that will be used to output tracer content in JSON format.
|
||||
# If present tracer will save data to defined file.
|
||||
# Format: file path
|
||||
#
|
||||
# type: string
|
||||
# env var: LOTUS_PUBSUB_JSONTRACER
|
||||
#JsonTracer = ""
|
||||
|
||||
# Connection string for elasticsearch instance.
|
||||
# If present tracer will save data to elasticsearch.
|
||||
# Format: https://<username>:<password>@<elasticsearch_url>:<port>/
|
||||
#
|
||||
# type: string
|
||||
# env var: LOTUS_PUBSUB_ELASTICSEARCHTRACER
|
||||
#ElasticSearchTracer = ""
|
||||
|
||||
# Name of elasticsearch index that will be used to save tracer data.
|
||||
# This property is used only if ElasticSearchTracer propery is set.
|
||||
#
|
||||
# type: string
|
||||
# env var: LOTUS_PUBSUB_ELASTICSEARCHINDEX
|
||||
#ElasticSearchIndex = ""
|
||||
|
||||
# Auth token that will be passed with logs to elasticsearch - used for weighted peers score.
|
||||
#
|
||||
# type: string
|
||||
# env var: LOTUS_PUBSUB_TRACERSOURCEAUTH
|
||||
#TracerSourceAuth = ""
|
||||
|
||||
|
||||
[Client]
|
||||
# type: bool
|
||||
|
@ -98,6 +98,35 @@
|
||||
# env var: LOTUS_PUBSUB_REMOTETRACER
|
||||
#RemoteTracer = ""
|
||||
|
||||
# Path to file that will be used to output tracer content in JSON format.
|
||||
# If present tracer will save data to defined file.
|
||||
# Format: file path
|
||||
#
|
||||
# type: string
|
||||
# env var: LOTUS_PUBSUB_JSONTRACER
|
||||
#JsonTracer = ""
|
||||
|
||||
# Connection string for elasticsearch instance.
|
||||
# If present tracer will save data to elasticsearch.
|
||||
# Format: https://<username>:<password>@<elasticsearch_url>:<port>/
|
||||
#
|
||||
# type: string
|
||||
# env var: LOTUS_PUBSUB_ELASTICSEARCHTRACER
|
||||
#ElasticSearchTracer = ""
|
||||
|
||||
# Name of elasticsearch index that will be used to save tracer data.
|
||||
# This property is used only if ElasticSearchTracer propery is set.
|
||||
#
|
||||
# type: string
|
||||
# env var: LOTUS_PUBSUB_ELASTICSEARCHINDEX
|
||||
#ElasticSearchIndex = ""
|
||||
|
||||
# Auth token that will be passed with logs to elasticsearch - used for weighted peers score.
|
||||
#
|
||||
# type: string
|
||||
# env var: LOTUS_PUBSUB_TRACERSOURCEAUTH
|
||||
#TracerSourceAuth = ""
|
||||
|
||||
|
||||
[Subsystems]
|
||||
# type: bool
|
||||
|
1
go.mod
1
go.mod
@ -23,6 +23,7 @@ require (
|
||||
github.com/drand/drand v1.3.0
|
||||
github.com/drand/kyber v1.1.7
|
||||
github.com/dustin/go-humanize v1.0.0
|
||||
github.com/elastic/go-elasticsearch/v7 v7.14.0
|
||||
github.com/elastic/go-sysinfo v1.7.0
|
||||
github.com/elastic/gosigar v0.14.2
|
||||
github.com/etclabscore/go-openrpc-reflect v0.0.36
|
||||
|
2
go.sum
2
go.sum
@ -266,6 +266,8 @@ github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5m
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
|
||||
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
|
||||
github.com/elastic/go-elasticsearch/v7 v7.14.0 h1:extp3jos/rwJn3J+lgbaGlwAgs0TVsIHme00GyNAyX4=
|
||||
github.com/elastic/go-elasticsearch/v7 v7.14.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
|
||||
github.com/elastic/go-sysinfo v1.7.0 h1:4vVvcfi255+8+TyQ7TYUTEK3A+G8v5FLE+ZKYL1z1Dg=
|
||||
github.com/elastic/go-sysinfo v1.7.0/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
|
||||
github.com/elastic/go-windows v1.0.0 h1:qLURgZFkkrYyTTkvYpsZIgf83AUsdIHfvlJaqaZ7aSY=
|
||||
|
@ -788,6 +788,35 @@ Type: Array of multiaddress peerinfo strings, must include peerid (/p2p/12D3K...
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "JsonTracer",
|
||||
Type: "string",
|
||||
|
||||
Comment: `Path to file that will be used to output tracer content in JSON format.
|
||||
If present tracer will save data to defined file.
|
||||
Format: file path`,
|
||||
},
|
||||
{
|
||||
Name: "ElasticSearchTracer",
|
||||
Type: "string",
|
||||
|
||||
Comment: `Connection string for elasticsearch instance.
|
||||
If present tracer will save data to elasticsearch.
|
||||
Format: https://<username>:<password>@<elasticsearch_url>:<port>/`,
|
||||
},
|
||||
{
|
||||
Name: "ElasticSearchIndex",
|
||||
Type: "string",
|
||||
|
||||
Comment: `Name of elasticsearch index that will be used to save tracer data.
|
||||
This property is used only if ElasticSearchTracer propery is set.`,
|
||||
},
|
||||
{
|
||||
Name: "TracerSourceAuth",
|
||||
Type: "string",
|
||||
|
||||
Comment: `Auth token that will be passed with logs to elasticsearch - used for weighted peers score.`,
|
||||
},
|
||||
},
|
||||
"RetrievalPricing": []DocField{
|
||||
{
|
||||
|
@ -563,6 +563,19 @@ type Pubsub struct {
|
||||
DirectPeers []string
|
||||
IPColocationWhitelist []string
|
||||
RemoteTracer string
|
||||
// Path to file that will be used to output tracer content in JSON format.
|
||||
// If present tracer will save data to defined file.
|
||||
// Format: file path
|
||||
JsonTracer string
|
||||
// Connection string for elasticsearch instance.
|
||||
// If present tracer will save data to elasticsearch.
|
||||
// Format: https://<username>:<password>@<elasticsearch_url>:<port>/
|
||||
ElasticSearchTracer string
|
||||
// Name of elasticsearch index that will be used to save tracer data.
|
||||
// This property is used only if ElasticSearchTracer propery is set.
|
||||
ElasticSearchIndex string
|
||||
// Auth token that will be passed with logs to elasticsearch - used for weighted peers score.
|
||||
TracerSourceAuth string
|
||||
}
|
||||
|
||||
type Chainstore struct {
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/node/modules/tracer"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -49,6 +50,30 @@ func ScoreKeeper() *dtypes.ScoreKeeper {
|
||||
return new(dtypes.ScoreKeeper)
|
||||
}
|
||||
|
||||
type PeerScoreTracker interface {
|
||||
UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot)
|
||||
}
|
||||
|
||||
type peerScoreTracker struct {
|
||||
sk *dtypes.ScoreKeeper
|
||||
lt tracer.LotusTracer
|
||||
}
|
||||
|
||||
func newPeerScoreTracker(lt tracer.LotusTracer, sk *dtypes.ScoreKeeper) PeerScoreTracker {
|
||||
return &peerScoreTracker{
|
||||
sk: sk,
|
||||
lt: lt,
|
||||
}
|
||||
}
|
||||
|
||||
func (pst *peerScoreTracker) UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot) {
|
||||
if pst.lt != nil {
|
||||
pst.lt.PeerScores(scores)
|
||||
}
|
||||
|
||||
pst.sk.Update(scores)
|
||||
}
|
||||
|
||||
type GossipIn struct {
|
||||
fx.In
|
||||
Mctx helpers.MetricsCtx
|
||||
@ -291,7 +316,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
||||
OpportunisticGraftThreshold: OpportunisticGraftScoreThreshold,
|
||||
},
|
||||
),
|
||||
pubsub.WithPeerScoreInspect(in.Sk.Update, 10*time.Second),
|
||||
}
|
||||
|
||||
// enable Peer eXchange on bootstrappers
|
||||
@ -361,6 +385,27 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
||||
pubsub.NewAllowlistSubscriptionFilter(allowTopics...),
|
||||
100)))
|
||||
|
||||
var transports []tracer.TracerTransport
|
||||
if in.Cfg.JsonTracer != "" {
|
||||
jsonTransport, err := tracer.NewJsonTracerTransport(in.Cfg.JsonTracer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
transports = append(transports, jsonTransport)
|
||||
}
|
||||
if in.Cfg.ElasticSearchTracer != "" {
|
||||
elasticSearchTransport, err := tracer.NewElasticSearchTransport(
|
||||
in.Cfg.ElasticSearchTracer,
|
||||
in.Cfg.ElasticSearchIndex,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
transports = append(transports, elasticSearchTransport)
|
||||
}
|
||||
lt := tracer.NewLotusTracer(transports, in.Host.ID(), in.Cfg.TracerSourceAuth)
|
||||
|
||||
// tracer
|
||||
if in.Cfg.RemoteTracer != "" {
|
||||
a, err := ma.NewMultiaddr(in.Cfg.RemoteTracer)
|
||||
@ -378,12 +423,18 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
trw := newTracerWrapper(tr, build.BlocksTopic(in.Nn))
|
||||
pst := newPeerScoreTracker(lt, in.Sk)
|
||||
trw := newTracerWrapper(tr, lt, build.BlocksTopic(in.Nn))
|
||||
|
||||
options = append(options, pubsub.WithEventTracer(trw))
|
||||
options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second))
|
||||
} else {
|
||||
// still instantiate a tracer for collecting metrics
|
||||
trw := newTracerWrapper(nil)
|
||||
trw := newTracerWrapper(nil, lt)
|
||||
options = append(options, pubsub.WithEventTracer(trw))
|
||||
|
||||
pst := newPeerScoreTracker(lt, in.Sk)
|
||||
options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second))
|
||||
}
|
||||
|
||||
return pubsub.NewGossipSub(helpers.LifecycleCtx(in.Mctx, in.Lc), in.Host, options...)
|
||||
@ -394,7 +445,11 @@ func HashMsgId(m *pubsub_pb.Message) string {
|
||||
return string(hash[:])
|
||||
}
|
||||
|
||||
func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTracer {
|
||||
func newTracerWrapper(
|
||||
lp2pTracer pubsub.EventTracer,
|
||||
lotusTracer pubsub.EventTracer,
|
||||
topics ...string,
|
||||
) pubsub.EventTracer {
|
||||
var topicsMap map[string]struct{}
|
||||
if len(topics) > 0 {
|
||||
topicsMap = make(map[string]struct{})
|
||||
@ -403,12 +458,13 @@ func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTrace
|
||||
}
|
||||
}
|
||||
|
||||
return &tracerWrapper{tr: tr, topics: topicsMap}
|
||||
return &tracerWrapper{lp2pTracer: lp2pTracer, lotusTracer: lotusTracer, topics: topicsMap}
|
||||
}
|
||||
|
||||
type tracerWrapper struct {
|
||||
tr pubsub.EventTracer
|
||||
topics map[string]struct{}
|
||||
lp2pTracer pubsub.EventTracer
|
||||
lotusTracer pubsub.EventTracer
|
||||
topics map[string]struct{}
|
||||
}
|
||||
|
||||
func (trw *tracerWrapper) traceMessage(topic string) bool {
|
||||
@ -426,33 +482,70 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
|
||||
switch evt.GetType() {
|
||||
case pubsub_pb.TraceEvent_PUBLISH_MESSAGE:
|
||||
stats.Record(context.TODO(), metrics.PubsubPublishMessage.M(1))
|
||||
if trw.tr != nil && trw.traceMessage(evt.GetPublishMessage().GetTopic()) {
|
||||
trw.tr.Trace(evt)
|
||||
if trw.traceMessage(evt.GetPublishMessage().GetTopic()) {
|
||||
if trw.lp2pTracer != nil {
|
||||
trw.lp2pTracer.Trace(evt)
|
||||
}
|
||||
|
||||
if trw.lotusTracer != nil {
|
||||
trw.lotusTracer.Trace(evt)
|
||||
}
|
||||
}
|
||||
case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
|
||||
stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1))
|
||||
if trw.tr != nil && trw.traceMessage(evt.GetDeliverMessage().GetTopic()) {
|
||||
trw.tr.Trace(evt)
|
||||
if trw.traceMessage(evt.GetDeliverMessage().GetTopic()) {
|
||||
if trw.lp2pTracer != nil {
|
||||
trw.lp2pTracer.Trace(evt)
|
||||
}
|
||||
|
||||
if trw.lotusTracer != nil {
|
||||
trw.lotusTracer.Trace(evt)
|
||||
}
|
||||
}
|
||||
case pubsub_pb.TraceEvent_REJECT_MESSAGE:
|
||||
stats.Record(context.TODO(), metrics.PubsubRejectMessage.M(1))
|
||||
if trw.traceMessage(evt.GetRejectMessage().GetTopic()) {
|
||||
if trw.lp2pTracer != nil {
|
||||
trw.lp2pTracer.Trace(evt)
|
||||
}
|
||||
|
||||
if trw.lotusTracer != nil {
|
||||
trw.lotusTracer.Trace(evt)
|
||||
}
|
||||
}
|
||||
case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE:
|
||||
stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1))
|
||||
case pubsub_pb.TraceEvent_JOIN:
|
||||
if trw.tr != nil {
|
||||
trw.tr.Trace(evt)
|
||||
if trw.lp2pTracer != nil {
|
||||
trw.lp2pTracer.Trace(evt)
|
||||
}
|
||||
|
||||
if trw.lotusTracer != nil {
|
||||
trw.lotusTracer.Trace(evt)
|
||||
}
|
||||
case pubsub_pb.TraceEvent_LEAVE:
|
||||
if trw.tr != nil {
|
||||
trw.tr.Trace(evt)
|
||||
if trw.lp2pTracer != nil {
|
||||
trw.lp2pTracer.Trace(evt)
|
||||
}
|
||||
|
||||
if trw.lotusTracer != nil {
|
||||
trw.lotusTracer.Trace(evt)
|
||||
}
|
||||
case pubsub_pb.TraceEvent_GRAFT:
|
||||
if trw.tr != nil {
|
||||
trw.tr.Trace(evt)
|
||||
if trw.lp2pTracer != nil {
|
||||
trw.lp2pTracer.Trace(evt)
|
||||
}
|
||||
|
||||
if trw.lotusTracer != nil {
|
||||
trw.lotusTracer.Trace(evt)
|
||||
}
|
||||
case pubsub_pb.TraceEvent_PRUNE:
|
||||
if trw.tr != nil {
|
||||
trw.tr.Trace(evt)
|
||||
if trw.lp2pTracer != nil {
|
||||
trw.lp2pTracer.Trace(evt)
|
||||
}
|
||||
|
||||
if trw.lotusTracer != nil {
|
||||
trw.lotusTracer.Trace(evt)
|
||||
}
|
||||
case pubsub_pb.TraceEvent_RECV_RPC:
|
||||
stats.Record(context.TODO(), metrics.PubsubRecvRPC.M(1))
|
||||
|
97
node/modules/tracer/elasticsearch_transport.go
Normal file
97
node/modules/tracer/elasticsearch_transport.go
Normal file
@ -0,0 +1,97 @@
|
||||
package tracer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v7"
|
||||
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||||
)
|
||||
|
||||
const (
|
||||
ElasticSearchDefaultIndex = "lotus-pubsub"
|
||||
)
|
||||
|
||||
func NewElasticSearchTransport(connectionString string, elasticsearchIndex string) (TracerTransport, error) {
|
||||
conUrl, err := url.Parse(connectionString)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
username := conUrl.User.Username()
|
||||
password, _ := conUrl.User.Password()
|
||||
cfg := elasticsearch.Config{
|
||||
Addresses: []string{
|
||||
conUrl.Scheme + "://" + conUrl.Host,
|
||||
},
|
||||
Username: username,
|
||||
Password: password,
|
||||
}
|
||||
|
||||
es, err := elasticsearch.NewClient(cfg)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var esIndex string
|
||||
if elasticsearchIndex != "" {
|
||||
esIndex = elasticsearchIndex
|
||||
} else {
|
||||
esIndex = ElasticSearchDefaultIndex
|
||||
}
|
||||
|
||||
return &elasticSearchTransport{
|
||||
cl: es,
|
||||
esIndex: esIndex,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type elasticSearchTransport struct {
|
||||
cl *elasticsearch.Client
|
||||
esIndex string
|
||||
}
|
||||
|
||||
func (est *elasticSearchTransport) Transport(evt TracerTransportEvent) error {
|
||||
var e interface{}
|
||||
|
||||
if evt.lotusTraceEvent != nil {
|
||||
e = *evt.lotusTraceEvent
|
||||
} else if evt.pubsubTraceEvent != nil {
|
||||
e = *evt.pubsubTraceEvent
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
|
||||
jsonEvt, err := json.Marshal(e)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while marshaling event: %s", err)
|
||||
}
|
||||
|
||||
req := esapi.IndexRequest{
|
||||
Index: est.esIndex,
|
||||
Body: strings.NewReader(string(jsonEvt)),
|
||||
Refresh: "true",
|
||||
}
|
||||
|
||||
// Perform the request with the client.
|
||||
res, err := req.Do(context.Background(), est.cl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = res.Body.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if res.IsError() {
|
||||
return fmt.Errorf("[%s] Error indexing document ID=%s", res.Status(), req.DocumentID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
41
node/modules/tracer/json_transport.go
Normal file
41
node/modules/tracer/json_transport.go
Normal file
@ -0,0 +1,41 @@
|
||||
package tracer
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
type jsonTracerTransport struct {
|
||||
out *os.File
|
||||
}
|
||||
|
||||
func NewJsonTracerTransport(file string) (TracerTransport, error) {
|
||||
out, err := os.OpenFile(file, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0660)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &jsonTracerTransport{
|
||||
out: out,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (jtt *jsonTracerTransport) Transport(evt TracerTransportEvent) error {
|
||||
var e interface{}
|
||||
if evt.lotusTraceEvent != nil {
|
||||
e = *evt.lotusTraceEvent
|
||||
} else if evt.pubsubTraceEvent != nil {
|
||||
e = *evt.pubsubTraceEvent
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
|
||||
jsonEvt, err := json.Marshal(e)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while marshaling event: %s", err)
|
||||
}
|
||||
|
||||
_, err = jtt.out.WriteString(string(jsonEvt) + "\n")
|
||||
return err
|
||||
}
|
120
node/modules/tracer/tracer.go
Normal file
120
node/modules/tracer/tracer.go
Normal file
@ -0,0 +1,120 @@
|
||||
package tracer
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
)
|
||||
|
||||
var log = logging.Logger("lotus-tracer")
|
||||
|
||||
func NewLotusTracer(tt []TracerTransport, pid peer.ID, sourceAuth string) LotusTracer {
|
||||
return &lotusTracer{
|
||||
tt: tt,
|
||||
pid: pid,
|
||||
sa: sourceAuth,
|
||||
}
|
||||
}
|
||||
|
||||
type lotusTracer struct {
|
||||
tt []TracerTransport
|
||||
pid peer.ID
|
||||
sa string
|
||||
}
|
||||
|
||||
const (
|
||||
TraceEventPeerScores pubsub_pb.TraceEvent_Type = 100
|
||||
)
|
||||
|
||||
type LotusTraceEvent struct {
|
||||
Type pubsub_pb.TraceEvent_Type `json:"type,omitempty"`
|
||||
PeerID string `json:"peerID,omitempty"`
|
||||
Timestamp *int64 `json:"timestamp,omitempty"`
|
||||
PeerScore TraceEventPeerScore `json:"peerScore,omitempty"`
|
||||
SourceAuth string `json:"sourceAuth,omitempty"`
|
||||
}
|
||||
|
||||
type TopicScore struct {
|
||||
Topic string `json:"topic"`
|
||||
TimeInMesh time.Duration `json:"timeInMesh"`
|
||||
FirstMessageDeliveries float64 `json:"firstMessageDeliveries"`
|
||||
MeshMessageDeliveries float64 `json:"meshMessageDeliveries"`
|
||||
InvalidMessageDeliveries float64 `json:"invalidMessageDeliveries"`
|
||||
}
|
||||
|
||||
type TraceEventPeerScore struct {
|
||||
PeerID string `json:"peerID"`
|
||||
Score float64 `json:"score"`
|
||||
AppSpecificScore float64 `json:"appSpecificScore"`
|
||||
IPColocationFactor float64 `json:"ipColocationFactor"`
|
||||
BehaviourPenalty float64 `json:"behaviourPenalty"`
|
||||
Topics []TopicScore `json:"topics"`
|
||||
}
|
||||
|
||||
type LotusTracer interface {
|
||||
Trace(evt *pubsub_pb.TraceEvent)
|
||||
TraceLotusEvent(evt *LotusTraceEvent)
|
||||
|
||||
PeerScores(scores map[peer.ID]*pubsub.PeerScoreSnapshot)
|
||||
}
|
||||
|
||||
func (lt *lotusTracer) PeerScores(scores map[peer.ID]*pubsub.PeerScoreSnapshot) {
|
||||
now := time.Now().UnixNano()
|
||||
for pid, score := range scores {
|
||||
var topics []TopicScore
|
||||
for topic, snapshot := range score.Topics {
|
||||
topics = append(topics, TopicScore{
|
||||
Topic: topic,
|
||||
TimeInMesh: snapshot.TimeInMesh,
|
||||
FirstMessageDeliveries: snapshot.FirstMessageDeliveries,
|
||||
MeshMessageDeliveries: snapshot.MeshMessageDeliveries,
|
||||
InvalidMessageDeliveries: snapshot.InvalidMessageDeliveries,
|
||||
})
|
||||
}
|
||||
|
||||
evt := &LotusTraceEvent{
|
||||
Type: *TraceEventPeerScores.Enum(),
|
||||
PeerID: lt.pid.Pretty(),
|
||||
Timestamp: &now,
|
||||
SourceAuth: lt.sa,
|
||||
PeerScore: TraceEventPeerScore{
|
||||
PeerID: pid.Pretty(),
|
||||
Score: score.Score,
|
||||
AppSpecificScore: score.AppSpecificScore,
|
||||
IPColocationFactor: score.IPColocationFactor,
|
||||
BehaviourPenalty: score.BehaviourPenalty,
|
||||
Topics: topics,
|
||||
},
|
||||
}
|
||||
|
||||
lt.TraceLotusEvent(evt)
|
||||
}
|
||||
}
|
||||
|
||||
func (lt *lotusTracer) TraceLotusEvent(evt *LotusTraceEvent) {
|
||||
for _, t := range lt.tt {
|
||||
err := t.Transport(TracerTransportEvent{
|
||||
lotusTraceEvent: evt,
|
||||
pubsubTraceEvent: nil,
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("error while transporting peer scores: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (lt *lotusTracer) Trace(evt *pubsub_pb.TraceEvent) {
|
||||
for _, t := range lt.tt {
|
||||
err := t.Transport(TracerTransportEvent{
|
||||
lotusTraceEvent: nil,
|
||||
pubsubTraceEvent: evt,
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("error while transporting trace event: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
109
node/modules/tracer/tracer_test.go
Normal file
109
node/modules/tracer/tracer_test.go
Normal file
@ -0,0 +1,109 @@
|
||||
package tracer
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type testTracerTransport struct {
|
||||
t *testing.T
|
||||
executeTest func(t *testing.T, evt TracerTransportEvent)
|
||||
}
|
||||
|
||||
const peerIDA peer.ID = "12D3KooWAbSVMgRejb6ECg6fRTkCPGCfu8396msZVryu8ivcz44G"
|
||||
|
||||
func NewTestTraceTransport(t *testing.T, executeTest func(t *testing.T, evt TracerTransportEvent)) TracerTransport {
|
||||
return &testTracerTransport{
|
||||
t: t,
|
||||
executeTest: executeTest,
|
||||
}
|
||||
}
|
||||
|
||||
func (ttt *testTracerTransport) Transport(evt TracerTransportEvent) error {
|
||||
ttt.executeTest(ttt.t, evt)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestTracer_PeerScores(t *testing.T) {
|
||||
|
||||
testTransport := NewTestTraceTransport(t, func(t *testing.T, evt TracerTransportEvent) {
|
||||
require.Equal(t, peerIDA.Pretty(), evt.lotusTraceEvent.PeerID)
|
||||
require.Equal(t, "source-auth-token-test", evt.lotusTraceEvent.SourceAuth)
|
||||
require.Equal(t, float64(32), evt.lotusTraceEvent.PeerScore.Score)
|
||||
|
||||
n := time.Now().UnixNano()
|
||||
require.LessOrEqual(t, *evt.lotusTraceEvent.Timestamp, n)
|
||||
|
||||
require.Equal(t, peerIDA.Pretty(), evt.lotusTraceEvent.PeerScore.PeerID)
|
||||
require.Equal(t, 1, len(evt.lotusTraceEvent.PeerScore.Topics))
|
||||
|
||||
topic := evt.lotusTraceEvent.PeerScore.Topics[0]
|
||||
require.Equal(t, "topicA", topic.Topic)
|
||||
require.Equal(t, float64(100), topic.FirstMessageDeliveries)
|
||||
})
|
||||
|
||||
lt := NewLotusTracer(
|
||||
[]TracerTransport{testTransport},
|
||||
peerIDA,
|
||||
"source-auth-token-test",
|
||||
)
|
||||
|
||||
topics := make(map[string]*pubsub.TopicScoreSnapshot)
|
||||
topics["topicA"] = &pubsub.TopicScoreSnapshot{
|
||||
FirstMessageDeliveries: float64(100),
|
||||
}
|
||||
|
||||
m := make(map[peer.ID]*pubsub.PeerScoreSnapshot)
|
||||
m[peerIDA] = &pubsub.PeerScoreSnapshot{
|
||||
Score: float64(32),
|
||||
Topics: topics,
|
||||
}
|
||||
|
||||
lt.PeerScores(m)
|
||||
}
|
||||
|
||||
func TestTracer_PubSubTrace(t *testing.T) {
|
||||
n := time.Now().Unix()
|
||||
|
||||
testTransport := NewTestTraceTransport(t, func(t *testing.T, evt TracerTransportEvent) {
|
||||
require.Equal(t, []byte(peerIDA), evt.pubsubTraceEvent.PeerID)
|
||||
require.Equal(t, &n, evt.pubsubTraceEvent.Timestamp)
|
||||
})
|
||||
|
||||
lt := NewLotusTracer(
|
||||
[]TracerTransport{testTransport},
|
||||
"pid",
|
||||
"source-auth",
|
||||
)
|
||||
|
||||
lt.Trace(&pubsub_pb.TraceEvent{
|
||||
PeerID: []byte(peerIDA),
|
||||
Timestamp: &n,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestTracer_MultipleTransports(t *testing.T) {
|
||||
testTransportA := NewTestTraceTransport(t, func(t *testing.T, evt TracerTransportEvent) {
|
||||
require.Equal(t, []byte(peerIDA), evt.pubsubTraceEvent.PeerID)
|
||||
})
|
||||
|
||||
testTransportB := NewTestTraceTransport(t, func(t *testing.T, evt TracerTransportEvent) {
|
||||
require.Equal(t, []byte(peerIDA), evt.pubsubTraceEvent.PeerID)
|
||||
})
|
||||
|
||||
executeTest := NewLotusTracer(
|
||||
[]TracerTransport{testTransportA, testTransportB},
|
||||
"pid",
|
||||
"source-auth",
|
||||
)
|
||||
|
||||
executeTest.Trace(&pubsub_pb.TraceEvent{
|
||||
PeerID: []byte(peerIDA),
|
||||
})
|
||||
}
|
12
node/modules/tracer/transport.go
Normal file
12
node/modules/tracer/transport.go
Normal file
@ -0,0 +1,12 @@
|
||||
package tracer
|
||||
|
||||
import pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
|
||||
type TracerTransport interface {
|
||||
Transport(evt TracerTransportEvent) error
|
||||
}
|
||||
|
||||
type TracerTransportEvent struct {
|
||||
lotusTraceEvent *LotusTraceEvent
|
||||
pubsubTraceEvent *pubsub_pb.TraceEvent
|
||||
}
|
20
tools/kibana/README.md
Normal file
20
tools/kibana/README.md
Normal file
@ -0,0 +1,20 @@
|
||||
## Lotus Kibana Dashboard
|
||||
|
||||
This folder contains configuration files to create Kibana dashboards to track peer scores and block propagation
|
||||
throughout Filecoin network.
|
||||
|
||||
### Importing index template
|
||||
|
||||
Index template needs to be imported into Elasticsearch for score weights and to
|
||||
prevent Elasticsearch from infering wrong field type.
|
||||
|
||||
The [template](./index-template.json) is loaded via [Kibana Index Management](https://www.elastic.co/guide/en/elasticsearch/reference/current/index-mgmt.html) and pasted
|
||||
into newly created Index Template.
|
||||
|
||||
### Importing dashboard
|
||||
|
||||
The peer score and block propagation dashboard configuration is imported via Kibana import saved object [functionality](https://www.elastic.co/guide/en/kibana/current/managing-saved-objects.html#managing-saved-objects-export-objects).
|
||||
|
||||
#### Custom index
|
||||
|
||||
By default, the dashboards and index template target `lotus-pubsub` index which is the default one when running node. The index can be customised via edit on dashboard visualizations and also index template needs to be updated to target new index.
|
2
tools/kibana/block-propagation-dashboard.ndjson
Normal file
2
tools/kibana/block-propagation-dashboard.ndjson
Normal file
File diff suppressed because one or more lines are too long
66
tools/kibana/index-template.json
Normal file
66
tools/kibana/index-template.json
Normal file
@ -0,0 +1,66 @@
|
||||
{
|
||||
"template": {
|
||||
"settings": {},
|
||||
"mappings": {
|
||||
"runtime": {
|
||||
"peerScore.weightedScore": {
|
||||
"type": "double",
|
||||
"script": {
|
||||
"source": "if (doc['type'].value == 100) {\n def score = doc['peerScore.score'].value;\n if (doc['sourceAuth'] == \"<password>\") {\n\n emit(score * 1.2)\n } else {\n emit(score)\n }\n}\n",
|
||||
"lang": "painless"
|
||||
}
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"peerScore": {
|
||||
"properties": {
|
||||
"appSpecificScore": {
|
||||
"type": "double"
|
||||
},
|
||||
"behaviourPenalty": {
|
||||
"type": "double"
|
||||
},
|
||||
"ipColocationFactor": {
|
||||
"type": "double"
|
||||
},
|
||||
"score": {
|
||||
"type": "double"
|
||||
},
|
||||
"topics": {
|
||||
"type": "nested",
|
||||
"properties": {
|
||||
"firstMessageDeliveries": {
|
||||
"type": "double",
|
||||
"ignore_malformed": false,
|
||||
"coerce": true
|
||||
},
|
||||
"invalidMessageDeliveries": {
|
||||
"type": "double",
|
||||
"ignore_malformed": false,
|
||||
"coerce": true
|
||||
},
|
||||
"meshMessageDeliveries": {
|
||||
"type": "double",
|
||||
"ignore_malformed": false,
|
||||
"coerce": true
|
||||
},
|
||||
"timeInMesh": {
|
||||
"type": "double",
|
||||
"ignore_malformed": false,
|
||||
"coerce": true
|
||||
},
|
||||
"topic": {
|
||||
"type": "keyword"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"sourceAuth": {
|
||||
"type": "keyword"
|
||||
}
|
||||
}
|
||||
},
|
||||
"aliases": {}
|
||||
}
|
||||
}
|
2
tools/kibana/peer-scores-dashboard.ndjson
Normal file
2
tools/kibana/peer-scores-dashboard.ndjson
Normal file
@ -0,0 +1,2 @@
|
||||
{"attributes":{"description":"Average peer score table per node peerID","hits":0,"kibanaSavedObjectMeta":{"searchSourceJSON":"{\"query\":{\"query\":\"\",\"language\":\"kuery\"},\"filter\":[]}"},"optionsJSON":"{\"useMargins\":true,\"syncColors\":false,\"hidePanelTitles\":false}","panelsJSON":"[{\"version\":\"7.14.1\",\"type\":\"lens\",\"gridData\":{\"x\":0,\"y\":0,\"w\":48,\"h\":43,\"i\":\"cddc98a5-45f3-4ba7-a7c6-6b9d216b19da\"},\"panelIndex\":\"cddc98a5-45f3-4ba7-a7c6-6b9d216b19da\",\"embeddableConfig\":{\"attributes\":{\"title\":\"\",\"type\":\"lens\",\"visualizationType\":\"lnsDatatable\",\"state\":{\"datasourceStates\":{\"indexpattern\":{\"layers\":{\"666e2f39-8868-45ad-b747-fe124830b0ae\":{\"columns\":{\"504c50bd-14c1-4119-820e-c961866fc3b4\":{\"label\":\"peerID\",\"dataType\":\"string\",\"operationType\":\"terms\",\"scale\":\"ordinal\",\"sourceField\":\"peerScore.peerID.keyword\",\"isBucketed\":true,\"params\":{\"size\":100,\"orderBy\":{\"type\":\"column\",\"columnId\":\"ec82c5f7-b3c4-4715-8646-a8fe584400fc\"},\"orderDirection\":\"desc\",\"otherBucket\":false,\"missingBucket\":false},\"customLabel\":true},\"ec82c5f7-b3c4-4715-8646-a8fe584400fc\":{\"label\":\"Score\",\"dataType\":\"number\",\"operationType\":\"average\",\"sourceField\":\"peerScore.score\",\"isBucketed\":false,\"scale\":\"ratio\",\"customLabel\":true},\"e22a19e8-1d71-43d6-9ca0-b30ddd423447\":{\"label\":\"Weighted Score\",\"dataType\":\"number\",\"operationType\":\"average\",\"sourceField\":\"peerScore.weightedScore\",\"isBucketed\":false,\"scale\":\"ratio\",\"customLabel\":true}},\"columnOrder\":[\"504c50bd-14c1-4119-820e-c961866fc3b4\",\"ec82c5f7-b3c4-4715-8646-a8fe584400fc\",\"e22a19e8-1d71-43d6-9ca0-b30ddd423447\"],\"incompleteColumns\":{}}}}},\"visualization\":{\"columns\":[{\"isTransposed\":false,\"columnId\":\"504c50bd-14c1-4119-820e-c961866fc3b4\"},{\"isTransposed\":false,\"columnId\":\"ec82c5f7-b3c4-4715-8646-a8fe584400fc\",\"colorMode\":\"cell\",\"palette\":{\"type\":\"palette\",\"name\":\"positive\",\"params\":{\"stops\":[{\"color\":\"#d6e9e4\",\"stop\":20},{\"color\":\"#aed3ca\",\"stop\":40},{\"color\":\"#85bdb1\",\"stop\":60},{\"color\":\"#5aa898\",\"stop\":80},{\"color\":\"#209280\",\"stop\":100}]}}},{\"columnId\":\"e22a19e8-1d71-43d6-9ca0-b30ddd423447\",\"isTransposed\":false,\"colorMode\":\"cell\",\"palette\":{\"type\":\"palette\",\"name\":\"positive\",\"params\":{\"stops\":[{\"color\":\"#d6e9e4\",\"stop\":20},{\"color\":\"#aed3ca\",\"stop\":40},{\"color\":\"#85bdb1\",\"stop\":60},{\"color\":\"#5aa898\",\"stop\":80},{\"color\":\"#209280\",\"stop\":100}]}}}],\"layerId\":\"666e2f39-8868-45ad-b747-fe124830b0ae\"},\"query\":{\"query\":\"\",\"language\":\"kuery\"},\"filters\":[]},\"references\":[{\"type\":\"index-pattern\",\"id\":\"9890c040-17b7-11ec-99f4-75d57f0cd0d8\",\"name\":\"indexpattern-datasource-current-indexpattern\"},{\"type\":\"index-pattern\",\"id\":\"2c407db0-1acb-11ec-99f4-75d57f0cd0d8\",\"name\":\"indexpattern-datasource-layer-666e2f39-8868-45ad-b747-fe124830b0ae\"}]},\"hidePanelTitles\":false,\"enhancements\":{}},\"title\":\"Peer Scores\"}]","timeRestore":false,"title":"Peer Scores","version":1},"coreMigrationVersion":"7.14.1","id":"e7e4fd70-1acb-11ec-99f4-75d57f0cd0d8","migrationVersion":{"dashboard":"7.14.0"},"references":[{"id":"2c407db0-1acb-11ec-99f4-75d57f0cd0d8","name":"cddc98a5-45f3-4ba7-a7c6-6b9d216b19da:indexpattern-datasource-current-indexpattern","type":"index-pattern"},{"id":"2c407db0-1acb-11ec-99f4-75d57f0cd0d8","name":"cddc98a5-45f3-4ba7-a7c6-6b9d216b19da:indexpattern-datasource-layer-666e2f39-8868-45ad-b747-fe124830b0ae","type":"index-pattern"}],"type":"dashboard","updated_at":"2021-09-24T12:06:00.625Z","version":"WzczNDgsMV0="}
|
||||
{"excludedObjects":[],"excludedObjectsCount":0,"exportedCount":1,"missingRefCount":0,"missingReferences":[]}
|
Loading…
Reference in New Issue
Block a user