diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index 889b87e4e..7f4ff591b 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -98,6 +98,35 @@ # env var: LOTUS_PUBSUB_REMOTETRACER #RemoteTracer = "" + # Path to file that will be used to output tracer content in JSON format. + # If present tracer will save data to defined file. + # Format: file path + # + # type: string + # env var: LOTUS_PUBSUB_JSONTRACER + #JsonTracer = "" + + # Connection string for elasticsearch instance. + # If present tracer will save data to elasticsearch. + # Format: https://:@:/ + # + # type: string + # env var: LOTUS_PUBSUB_ELASTICSEARCHTRACER + #ElasticSearchTracer = "" + + # Name of elasticsearch index that will be used to save tracer data. + # This property is used only if ElasticSearchTracer propery is set. + # + # type: string + # env var: LOTUS_PUBSUB_ELASTICSEARCHINDEX + #ElasticSearchIndex = "" + + # Auth token that will be passed with logs to elasticsearch - used for weighted peers score. + # + # type: string + # env var: LOTUS_PUBSUB_TRACERSOURCEAUTH + #TracerSourceAuth = "" + [Client] # type: bool diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index 939bac0cc..416531f98 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -98,6 +98,35 @@ # env var: LOTUS_PUBSUB_REMOTETRACER #RemoteTracer = "" + # Path to file that will be used to output tracer content in JSON format. + # If present tracer will save data to defined file. + # Format: file path + # + # type: string + # env var: LOTUS_PUBSUB_JSONTRACER + #JsonTracer = "" + + # Connection string for elasticsearch instance. + # If present tracer will save data to elasticsearch. + # Format: https://:@:/ + # + # type: string + # env var: LOTUS_PUBSUB_ELASTICSEARCHTRACER + #ElasticSearchTracer = "" + + # Name of elasticsearch index that will be used to save tracer data. + # This property is used only if ElasticSearchTracer propery is set. + # + # type: string + # env var: LOTUS_PUBSUB_ELASTICSEARCHINDEX + #ElasticSearchIndex = "" + + # Auth token that will be passed with logs to elasticsearch - used for weighted peers score. + # + # type: string + # env var: LOTUS_PUBSUB_TRACERSOURCEAUTH + #TracerSourceAuth = "" + [Subsystems] # type: bool diff --git a/go.mod b/go.mod index 2caaa8773..f8f875350 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/drand/drand v1.3.0 github.com/drand/kyber v1.1.7 github.com/dustin/go-humanize v1.0.0 + github.com/elastic/go-elasticsearch/v7 v7.14.0 github.com/elastic/go-sysinfo v1.7.0 github.com/elastic/gosigar v0.14.2 github.com/etclabscore/go-openrpc-reflect v0.0.36 diff --git a/go.sum b/go.sum index 2ea789caf..28d74d583 100644 --- a/go.sum +++ b/go.sum @@ -266,6 +266,8 @@ github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5m github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= +github.com/elastic/go-elasticsearch/v7 v7.14.0 h1:extp3jos/rwJn3J+lgbaGlwAgs0TVsIHme00GyNAyX4= +github.com/elastic/go-elasticsearch/v7 v7.14.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elastic/go-sysinfo v1.7.0 h1:4vVvcfi255+8+TyQ7TYUTEK3A+G8v5FLE+ZKYL1z1Dg= github.com/elastic/go-sysinfo v1.7.0/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-windows v1.0.0 h1:qLURgZFkkrYyTTkvYpsZIgf83AUsdIHfvlJaqaZ7aSY= diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index ecf533137..4723cc59f 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -788,6 +788,35 @@ Type: Array of multiaddress peerinfo strings, must include peerid (/p2p/12D3K... Comment: ``, }, + { + Name: "JsonTracer", + Type: "string", + + Comment: `Path to file that will be used to output tracer content in JSON format. +If present tracer will save data to defined file. +Format: file path`, + }, + { + Name: "ElasticSearchTracer", + Type: "string", + + Comment: `Connection string for elasticsearch instance. +If present tracer will save data to elasticsearch. +Format: https://:@:/`, + }, + { + Name: "ElasticSearchIndex", + Type: "string", + + Comment: `Name of elasticsearch index that will be used to save tracer data. +This property is used only if ElasticSearchTracer propery is set.`, + }, + { + Name: "TracerSourceAuth", + Type: "string", + + Comment: `Auth token that will be passed with logs to elasticsearch - used for weighted peers score.`, + }, }, "RetrievalPricing": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index 90d878b7e..cc1a943f4 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -563,6 +563,19 @@ type Pubsub struct { DirectPeers []string IPColocationWhitelist []string RemoteTracer string + // Path to file that will be used to output tracer content in JSON format. + // If present tracer will save data to defined file. + // Format: file path + JsonTracer string + // Connection string for elasticsearch instance. + // If present tracer will save data to elasticsearch. + // Format: https://:@:/ + ElasticSearchTracer string + // Name of elasticsearch index that will be used to save tracer data. + // This property is used only if ElasticSearchTracer propery is set. + ElasticSearchIndex string + // Auth token that will be passed with logs to elasticsearch - used for weighted peers score. + TracerSourceAuth string } type Chainstore struct { diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 9f1b58c2b..5376d93c8 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -21,6 +21,7 @@ import ( "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/node/modules/tracer" ) func init() { @@ -49,6 +50,30 @@ func ScoreKeeper() *dtypes.ScoreKeeper { return new(dtypes.ScoreKeeper) } +type PeerScoreTracker interface { + UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot) +} + +type peerScoreTracker struct { + sk *dtypes.ScoreKeeper + lt tracer.LotusTracer +} + +func newPeerScoreTracker(lt tracer.LotusTracer, sk *dtypes.ScoreKeeper) PeerScoreTracker { + return &peerScoreTracker{ + sk: sk, + lt: lt, + } +} + +func (pst *peerScoreTracker) UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot) { + if pst.lt != nil { + pst.lt.PeerScores(scores) + } + + pst.sk.Update(scores) +} + type GossipIn struct { fx.In Mctx helpers.MetricsCtx @@ -291,7 +316,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { OpportunisticGraftThreshold: OpportunisticGraftScoreThreshold, }, ), - pubsub.WithPeerScoreInspect(in.Sk.Update, 10*time.Second), } // enable Peer eXchange on bootstrappers @@ -361,6 +385,27 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { pubsub.NewAllowlistSubscriptionFilter(allowTopics...), 100))) + var transports []tracer.TracerTransport + if in.Cfg.JsonTracer != "" { + jsonTransport, err := tracer.NewJsonTracerTransport(in.Cfg.JsonTracer) + if err != nil { + return nil, err + } + + transports = append(transports, jsonTransport) + } + if in.Cfg.ElasticSearchTracer != "" { + elasticSearchTransport, err := tracer.NewElasticSearchTransport( + in.Cfg.ElasticSearchTracer, + in.Cfg.ElasticSearchIndex, + ) + if err != nil { + return nil, err + } + transports = append(transports, elasticSearchTransport) + } + lt := tracer.NewLotusTracer(transports, in.Host.ID(), in.Cfg.TracerSourceAuth) + // tracer if in.Cfg.RemoteTracer != "" { a, err := ma.NewMultiaddr(in.Cfg.RemoteTracer) @@ -378,12 +423,18 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { return nil, err } - trw := newTracerWrapper(tr, build.BlocksTopic(in.Nn)) + pst := newPeerScoreTracker(lt, in.Sk) + trw := newTracerWrapper(tr, lt, build.BlocksTopic(in.Nn)) + options = append(options, pubsub.WithEventTracer(trw)) + options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second)) } else { // still instantiate a tracer for collecting metrics - trw := newTracerWrapper(nil) + trw := newTracerWrapper(nil, lt) options = append(options, pubsub.WithEventTracer(trw)) + + pst := newPeerScoreTracker(lt, in.Sk) + options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second)) } return pubsub.NewGossipSub(helpers.LifecycleCtx(in.Mctx, in.Lc), in.Host, options...) @@ -394,7 +445,11 @@ func HashMsgId(m *pubsub_pb.Message) string { return string(hash[:]) } -func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTracer { +func newTracerWrapper( + lp2pTracer pubsub.EventTracer, + lotusTracer pubsub.EventTracer, + topics ...string, +) pubsub.EventTracer { var topicsMap map[string]struct{} if len(topics) > 0 { topicsMap = make(map[string]struct{}) @@ -403,12 +458,13 @@ func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTrace } } - return &tracerWrapper{tr: tr, topics: topicsMap} + return &tracerWrapper{lp2pTracer: lp2pTracer, lotusTracer: lotusTracer, topics: topicsMap} } type tracerWrapper struct { - tr pubsub.EventTracer - topics map[string]struct{} + lp2pTracer pubsub.EventTracer + lotusTracer pubsub.EventTracer + topics map[string]struct{} } func (trw *tracerWrapper) traceMessage(topic string) bool { @@ -426,33 +482,70 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) { switch evt.GetType() { case pubsub_pb.TraceEvent_PUBLISH_MESSAGE: stats.Record(context.TODO(), metrics.PubsubPublishMessage.M(1)) - if trw.tr != nil && trw.traceMessage(evt.GetPublishMessage().GetTopic()) { - trw.tr.Trace(evt) + if trw.traceMessage(evt.GetPublishMessage().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } } case pubsub_pb.TraceEvent_DELIVER_MESSAGE: stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1)) - if trw.tr != nil && trw.traceMessage(evt.GetDeliverMessage().GetTopic()) { - trw.tr.Trace(evt) + if trw.traceMessage(evt.GetDeliverMessage().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } } case pubsub_pb.TraceEvent_REJECT_MESSAGE: stats.Record(context.TODO(), metrics.PubsubRejectMessage.M(1)) + if trw.traceMessage(evt.GetRejectMessage().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE: stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1)) case pubsub_pb.TraceEvent_JOIN: - if trw.tr != nil { - trw.tr.Trace(evt) + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) } case pubsub_pb.TraceEvent_LEAVE: - if trw.tr != nil { - trw.tr.Trace(evt) + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) } case pubsub_pb.TraceEvent_GRAFT: - if trw.tr != nil { - trw.tr.Trace(evt) + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) } case pubsub_pb.TraceEvent_PRUNE: - if trw.tr != nil { - trw.tr.Trace(evt) + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) } case pubsub_pb.TraceEvent_RECV_RPC: stats.Record(context.TODO(), metrics.PubsubRecvRPC.M(1)) diff --git a/node/modules/tracer/elasticsearch_transport.go b/node/modules/tracer/elasticsearch_transport.go new file mode 100644 index 000000000..1f6f9a157 --- /dev/null +++ b/node/modules/tracer/elasticsearch_transport.go @@ -0,0 +1,97 @@ +package tracer + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "strings" + + "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" +) + +const ( + ElasticSearchDefaultIndex = "lotus-pubsub" +) + +func NewElasticSearchTransport(connectionString string, elasticsearchIndex string) (TracerTransport, error) { + conUrl, err := url.Parse(connectionString) + + if err != nil { + return nil, err + } + + username := conUrl.User.Username() + password, _ := conUrl.User.Password() + cfg := elasticsearch.Config{ + Addresses: []string{ + conUrl.Scheme + "://" + conUrl.Host, + }, + Username: username, + Password: password, + } + + es, err := elasticsearch.NewClient(cfg) + + if err != nil { + return nil, err + } + + var esIndex string + if elasticsearchIndex != "" { + esIndex = elasticsearchIndex + } else { + esIndex = ElasticSearchDefaultIndex + } + + return &elasticSearchTransport{ + cl: es, + esIndex: esIndex, + }, nil +} + +type elasticSearchTransport struct { + cl *elasticsearch.Client + esIndex string +} + +func (est *elasticSearchTransport) Transport(evt TracerTransportEvent) error { + var e interface{} + + if evt.lotusTraceEvent != nil { + e = *evt.lotusTraceEvent + } else if evt.pubsubTraceEvent != nil { + e = *evt.pubsubTraceEvent + } else { + return nil + } + + jsonEvt, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("error while marshaling event: %s", err) + } + + req := esapi.IndexRequest{ + Index: est.esIndex, + Body: strings.NewReader(string(jsonEvt)), + Refresh: "true", + } + + // Perform the request with the client. + res, err := req.Do(context.Background(), est.cl) + if err != nil { + return err + } + + err = res.Body.Close() + if err != nil { + return err + } + + if res.IsError() { + return fmt.Errorf("[%s] Error indexing document ID=%s", res.Status(), req.DocumentID) + } + + return nil +} diff --git a/node/modules/tracer/json_transport.go b/node/modules/tracer/json_transport.go new file mode 100644 index 000000000..ca8535f4b --- /dev/null +++ b/node/modules/tracer/json_transport.go @@ -0,0 +1,41 @@ +package tracer + +import ( + "encoding/json" + "fmt" + "os" +) + +type jsonTracerTransport struct { + out *os.File +} + +func NewJsonTracerTransport(file string) (TracerTransport, error) { + out, err := os.OpenFile(file, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0660) + if err != nil { + return nil, err + } + + return &jsonTracerTransport{ + out: out, + }, nil +} + +func (jtt *jsonTracerTransport) Transport(evt TracerTransportEvent) error { + var e interface{} + if evt.lotusTraceEvent != nil { + e = *evt.lotusTraceEvent + } else if evt.pubsubTraceEvent != nil { + e = *evt.pubsubTraceEvent + } else { + return nil + } + + jsonEvt, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("error while marshaling event: %s", err) + } + + _, err = jtt.out.WriteString(string(jsonEvt) + "\n") + return err +} diff --git a/node/modules/tracer/tracer.go b/node/modules/tracer/tracer.go new file mode 100644 index 000000000..e31b4ea52 --- /dev/null +++ b/node/modules/tracer/tracer.go @@ -0,0 +1,120 @@ +package tracer + +import ( + "time" + + logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p-core/peer" + pubsub "github.com/libp2p/go-libp2p-pubsub" + pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" +) + +var log = logging.Logger("lotus-tracer") + +func NewLotusTracer(tt []TracerTransport, pid peer.ID, sourceAuth string) LotusTracer { + return &lotusTracer{ + tt: tt, + pid: pid, + sa: sourceAuth, + } +} + +type lotusTracer struct { + tt []TracerTransport + pid peer.ID + sa string +} + +const ( + TraceEventPeerScores pubsub_pb.TraceEvent_Type = 100 +) + +type LotusTraceEvent struct { + Type pubsub_pb.TraceEvent_Type `json:"type,omitempty"` + PeerID string `json:"peerID,omitempty"` + Timestamp *int64 `json:"timestamp,omitempty"` + PeerScore TraceEventPeerScore `json:"peerScore,omitempty"` + SourceAuth string `json:"sourceAuth,omitempty"` +} + +type TopicScore struct { + Topic string `json:"topic"` + TimeInMesh time.Duration `json:"timeInMesh"` + FirstMessageDeliveries float64 `json:"firstMessageDeliveries"` + MeshMessageDeliveries float64 `json:"meshMessageDeliveries"` + InvalidMessageDeliveries float64 `json:"invalidMessageDeliveries"` +} + +type TraceEventPeerScore struct { + PeerID string `json:"peerID"` + Score float64 `json:"score"` + AppSpecificScore float64 `json:"appSpecificScore"` + IPColocationFactor float64 `json:"ipColocationFactor"` + BehaviourPenalty float64 `json:"behaviourPenalty"` + Topics []TopicScore `json:"topics"` +} + +type LotusTracer interface { + Trace(evt *pubsub_pb.TraceEvent) + TraceLotusEvent(evt *LotusTraceEvent) + + PeerScores(scores map[peer.ID]*pubsub.PeerScoreSnapshot) +} + +func (lt *lotusTracer) PeerScores(scores map[peer.ID]*pubsub.PeerScoreSnapshot) { + now := time.Now().UnixNano() + for pid, score := range scores { + var topics []TopicScore + for topic, snapshot := range score.Topics { + topics = append(topics, TopicScore{ + Topic: topic, + TimeInMesh: snapshot.TimeInMesh, + FirstMessageDeliveries: snapshot.FirstMessageDeliveries, + MeshMessageDeliveries: snapshot.MeshMessageDeliveries, + InvalidMessageDeliveries: snapshot.InvalidMessageDeliveries, + }) + } + + evt := &LotusTraceEvent{ + Type: *TraceEventPeerScores.Enum(), + PeerID: lt.pid.Pretty(), + Timestamp: &now, + SourceAuth: lt.sa, + PeerScore: TraceEventPeerScore{ + PeerID: pid.Pretty(), + Score: score.Score, + AppSpecificScore: score.AppSpecificScore, + IPColocationFactor: score.IPColocationFactor, + BehaviourPenalty: score.BehaviourPenalty, + Topics: topics, + }, + } + + lt.TraceLotusEvent(evt) + } +} + +func (lt *lotusTracer) TraceLotusEvent(evt *LotusTraceEvent) { + for _, t := range lt.tt { + err := t.Transport(TracerTransportEvent{ + lotusTraceEvent: evt, + pubsubTraceEvent: nil, + }) + if err != nil { + log.Errorf("error while transporting peer scores: %s", err) + } + } + +} + +func (lt *lotusTracer) Trace(evt *pubsub_pb.TraceEvent) { + for _, t := range lt.tt { + err := t.Transport(TracerTransportEvent{ + lotusTraceEvent: nil, + pubsubTraceEvent: evt, + }) + if err != nil { + log.Errorf("error while transporting trace event: %s", err) + } + } +} diff --git a/node/modules/tracer/tracer_test.go b/node/modules/tracer/tracer_test.go new file mode 100644 index 000000000..d5faf8a62 --- /dev/null +++ b/node/modules/tracer/tracer_test.go @@ -0,0 +1,109 @@ +package tracer + +import ( + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + pubsub "github.com/libp2p/go-libp2p-pubsub" + pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/stretchr/testify/require" +) + +type testTracerTransport struct { + t *testing.T + executeTest func(t *testing.T, evt TracerTransportEvent) +} + +const peerIDA peer.ID = "12D3KooWAbSVMgRejb6ECg6fRTkCPGCfu8396msZVryu8ivcz44G" + +func NewTestTraceTransport(t *testing.T, executeTest func(t *testing.T, evt TracerTransportEvent)) TracerTransport { + return &testTracerTransport{ + t: t, + executeTest: executeTest, + } +} + +func (ttt *testTracerTransport) Transport(evt TracerTransportEvent) error { + ttt.executeTest(ttt.t, evt) + return nil +} + +func TestTracer_PeerScores(t *testing.T) { + + testTransport := NewTestTraceTransport(t, func(t *testing.T, evt TracerTransportEvent) { + require.Equal(t, peerIDA.Pretty(), evt.lotusTraceEvent.PeerID) + require.Equal(t, "source-auth-token-test", evt.lotusTraceEvent.SourceAuth) + require.Equal(t, float64(32), evt.lotusTraceEvent.PeerScore.Score) + + n := time.Now().UnixNano() + require.LessOrEqual(t, *evt.lotusTraceEvent.Timestamp, n) + + require.Equal(t, peerIDA.Pretty(), evt.lotusTraceEvent.PeerScore.PeerID) + require.Equal(t, 1, len(evt.lotusTraceEvent.PeerScore.Topics)) + + topic := evt.lotusTraceEvent.PeerScore.Topics[0] + require.Equal(t, "topicA", topic.Topic) + require.Equal(t, float64(100), topic.FirstMessageDeliveries) + }) + + lt := NewLotusTracer( + []TracerTransport{testTransport}, + peerIDA, + "source-auth-token-test", + ) + + topics := make(map[string]*pubsub.TopicScoreSnapshot) + topics["topicA"] = &pubsub.TopicScoreSnapshot{ + FirstMessageDeliveries: float64(100), + } + + m := make(map[peer.ID]*pubsub.PeerScoreSnapshot) + m[peerIDA] = &pubsub.PeerScoreSnapshot{ + Score: float64(32), + Topics: topics, + } + + lt.PeerScores(m) +} + +func TestTracer_PubSubTrace(t *testing.T) { + n := time.Now().Unix() + + testTransport := NewTestTraceTransport(t, func(t *testing.T, evt TracerTransportEvent) { + require.Equal(t, []byte(peerIDA), evt.pubsubTraceEvent.PeerID) + require.Equal(t, &n, evt.pubsubTraceEvent.Timestamp) + }) + + lt := NewLotusTracer( + []TracerTransport{testTransport}, + "pid", + "source-auth", + ) + + lt.Trace(&pubsub_pb.TraceEvent{ + PeerID: []byte(peerIDA), + Timestamp: &n, + }) + +} + +func TestTracer_MultipleTransports(t *testing.T) { + testTransportA := NewTestTraceTransport(t, func(t *testing.T, evt TracerTransportEvent) { + require.Equal(t, []byte(peerIDA), evt.pubsubTraceEvent.PeerID) + }) + + testTransportB := NewTestTraceTransport(t, func(t *testing.T, evt TracerTransportEvent) { + require.Equal(t, []byte(peerIDA), evt.pubsubTraceEvent.PeerID) + }) + + executeTest := NewLotusTracer( + []TracerTransport{testTransportA, testTransportB}, + "pid", + "source-auth", + ) + + executeTest.Trace(&pubsub_pb.TraceEvent{ + PeerID: []byte(peerIDA), + }) +} diff --git a/node/modules/tracer/transport.go b/node/modules/tracer/transport.go new file mode 100644 index 000000000..56d926afc --- /dev/null +++ b/node/modules/tracer/transport.go @@ -0,0 +1,12 @@ +package tracer + +import pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" + +type TracerTransport interface { + Transport(evt TracerTransportEvent) error +} + +type TracerTransportEvent struct { + lotusTraceEvent *LotusTraceEvent + pubsubTraceEvent *pubsub_pb.TraceEvent +} diff --git a/tools/kibana/README.md b/tools/kibana/README.md new file mode 100644 index 000000000..c556ae10c --- /dev/null +++ b/tools/kibana/README.md @@ -0,0 +1,20 @@ +## Lotus Kibana Dashboard + +This folder contains configuration files to create Kibana dashboards to track peer scores and block propagation +throughout Filecoin network. + +### Importing index template + +Index template needs to be imported into Elasticsearch for score weights and to +prevent Elasticsearch from infering wrong field type. + +The [template](./index-template.json) is loaded via [Kibana Index Management](https://www.elastic.co/guide/en/elasticsearch/reference/current/index-mgmt.html) and pasted +into newly created Index Template. + +### Importing dashboard + +The peer score and block propagation dashboard configuration is imported via Kibana import saved object [functionality](https://www.elastic.co/guide/en/kibana/current/managing-saved-objects.html#managing-saved-objects-export-objects). + +#### Custom index + +By default, the dashboards and index template target `lotus-pubsub` index which is the default one when running node. The index can be customised via edit on dashboard visualizations and also index template needs to be updated to target new index. diff --git a/tools/kibana/block-propagation-dashboard.ndjson b/tools/kibana/block-propagation-dashboard.ndjson new file mode 100644 index 000000000..c7b14c0c3 --- /dev/null +++ b/tools/kibana/block-propagation-dashboard.ndjson @@ -0,0 +1,2 @@ +{"attributes":{"description":"Force layout vega graph where link force distance between peers is block propagation","hits":0,"kibanaSavedObjectMeta":{"searchSourceJSON":"{\"query\":{\"query\":\"\",\"language\":\"kuery\"},\"filter\":[]}"},"optionsJSON":"{\"useMargins\":true,\"syncColors\":false,\"hidePanelTitles\":false}","panelsJSON":"[{\"version\":\"7.14.1\",\"type\":\"visualization\",\"gridData\":{\"x\":0,\"y\":0,\"w\":48,\"h\":42,\"i\":\"c7e4001d-38c9-4fa0-a488-e069dd50d274\"},\"panelIndex\":\"c7e4001d-38c9-4fa0-a488-e069dd50d274\",\"embeddableConfig\":{\"savedVis\":{\"title\":\"Block propagation\",\"description\":\"\",\"type\":\"vega\",\"params\":{\"spec\":\"{\\n $schema: https://vega.github.io/schema/vega/v5.json\\n title: Node block propagation\\n\\n\\n \\\"signals\\\": [\\n { \\\"name\\\": \\\"cx\\\", \\\"update\\\": \\\"width / 2\\\" },\\n { \\\"name\\\": \\\"cy\\\", \\\"update\\\": \\\"height / 2\\\" },\\n { \\\"name\\\": \\\"nodeRadius\\\", \\\"value\\\": 10,\\n \\\"bind\\\": {\\\"input\\\": \\\"range\\\", \\\"min\\\": 1, \\\"max\\\": 20, \\\"step\\\": 1} },\\n { \\\"name\\\": \\\"propagationMultiplier\\\", \\\"value\\\": 1,\\n \\\"bind\\\": {\\\"input\\\": \\\"range\\\", \\\"min\\\": 1, \\\"max\\\": 100, \\\"step\\\": 1} },\\n ],\\n\\n data: [\\n {\\n \\n name: \\\"node-data\\\",\\n url: {\\n index: lotus-pubsub\\n body: {\\n size: 0,\\n aggs: {\\n unique_peerID: {\\n terms: { \\n field: \\\"peerID\\\",\\n size: 10000\\n },\\n }\\n }\\n }\\n },\\n format: {\\\"property\\\": \\\"aggregations.unique_peerID.buckets\\\"},\\n transform: [\\n {\\n \\\"type\\\": \\\"project\\\",\\n \\\"fields\\\": [\\\"key\\\"],\\n \\\"as\\\": [\\\"peerID\\\"]\\n },\\n {\\n type: \\\"identifier\\\",\\n as: \\\"id\\\"\\n },\\n {\\n type: \\\"formula\\\",\\n expr: \\\"datum.id - 1\\\",\\n as: \\\"index\\\"\\n }\\n ]\\n },\\n {\\n name: \\\"published-message-data\\\",\\n url: {\\n index: lotus-pubsub\\n body: {\\n size: 10000,\\n query: {\\n bool: {\\n must: [\\n {\\n match: {\\n type: 0\\n }\\n }\\n ]\\n }\\n }\\n }\\n },\\n format: {\\\"property\\\": \\\"hits.hits\\\"},\\n },\\n {\\n name: \\\"link-data\\\",\\n url: {\\n index: lotus-pubsub\\n body: {\\n size: 10000,\\n query: {\\n bool: {\\n must: [\\n {\\n match: {\\n type: 1\\n }\\n }\\n ]\\n }\\n }\\n }\\n },\\n format: {\\\"property\\\": \\\"hits.hits\\\"},\\n transform: [\\n {\\n \\\"type\\\": \\\"lookup\\\",\\n \\\"from\\\": \\\"published-message-data\\\",\\n \\\"key\\\": \\\"_source.publishMessage.messageID\\\",\\n \\\"fields\\\": [\\\"_source.deliverMessage.messageID\\\"],\\n \\\"as\\\": [\\\"publishedMessage\\\"],\\n },\\n {\\n \\\"type\\\": \\\"filter\\\",\\n \\\"expr\\\": \\\"datum.publishedMessage != null\\\"\\n },\\n {\\n \\\"type\\\": \\\"lookup\\\",\\n \\\"from\\\": \\\"node-data\\\",\\n \\\"key\\\": \\\"peerID\\\",\\n \\\"fields\\\": [\\\"_source.peerID\\\"],\\n \\\"as\\\": [\\\"source\\\"],\\n },\\n {\\n \\\"type\\\": \\\"lookup\\\",\\n \\\"from\\\": \\\"node-data\\\",\\n \\\"key\\\": \\\"peerID\\\",\\n \\\"fields\\\": [\\\"publishedMessage._source.peerID\\\"],\\n \\\"as\\\": [\\\"target\\\"],\\n },\\n {\\n \\\"type\\\": \\\"formula\\\",\\n \\\"expr\\\": \\\"(datum._source.timestamp - datum.publishedMessage._source.timestamp) * propagationMultiplier\\\",\\n \\\"as\\\": \\\"distance\\\"\\n },\\n {\\n \\\"type\\\": \\\"project\\\",\\n \\\"fields\\\": [\\\"source.index\\\", \\\"target.index\\\", \\\"distance\\\"]\\n \\\"as\\\": [\\\"source\\\", \\\"target\\\", \\\"distance\\\"]\\n },\\n {\\n \\\"type\\\": \\\"aggregate\\\",\\n \\\"ops\\\": [\\\"average\\\"],\\n \\\"fields\\\": [\\\"distance\\\"],\\n \\\"groupby\\\": [\\\"source\\\", \\\"target\\\"],\\n \\\"as\\\": [\\\"distance\\\"]\\n }\\n ]\\n }\\n ]\\n\\n scales: [\\n {\\n \\\"name\\\": \\\"color\\\",\\n \\\"type\\\": \\\"ordinal\\\",\\n \\\"domain\\\": {\\\"data\\\": \\\"node-data\\\", \\\"field\\\": \\\"id\\\"},\\n \\\"range\\\": {\\\"scheme\\\": \\\"category20c\\\"}\\n }\\n ]\\n\\n marks: [\\n {\\n \\\"name\\\": \\\"nodes\\\",\\n \\\"type\\\": \\\"symbol\\\",\\n \\\"zindex\\\": 1,\\n \\n \\\"encode\\\": {\\n \\\"enter\\\": {\\n \\\"fill\\\": {\\\"scale\\\": \\\"color\\\", \\\"field\\\": \\\"id\\\"},\\n \\\"stroke\\\": {\\\"value\\\": \\\"black\\\"},\\n \\\"tooltip\\\": {\\\"signal\\\": \\\"{'PeerID': datum.peerID}\\\"}\\n },\\n \\\"update\\\": {\\n \\\"size\\\": {\\\"signal\\\": \\\"2 * nodeRadius * nodeRadius\\\"},\\n \\\"fill\\\": {\\\"scale\\\": \\\"color\\\", \\\"field\\\": \\\"id\\\"},\\n },\\n \\\"hover\\\": { \\\"fill\\\": {\\\"value\\\": \\\"red\\\"} },\\n },\\n\\n \\\"from\\\": {\\\"data\\\": \\\"node-data\\\"},\\n \\\"transform\\\": [\\n {\\n \\\"type\\\": \\\"force\\\",\\n \\\"restart\\\": false,\\n \\\"static\\\": true,\\n \\\"signal\\\": \\\"force\\\",\\n \\\"forces\\\": [\\n {\\\"force\\\": \\\"link\\\", \\\"links\\\": \\\"link-data\\\", distance: {field: \\\"distance\\\"}},\\n {\\\"force\\\": \\\"center\\\", \\\"x\\\": {\\\"signal\\\": \\\"cx\\\"}, \\\"y\\\": {\\\"signal\\\": \\\"cy\\\"}},\\n ]\\n }\\n ]\\n },\\n {\\n \\\"type\\\": \\\"path\\\",\\n \\\"from\\\": {\\\"data\\\": \\\"link-data\\\"},\\n \\\"encode\\\": {\\n \\\"enter\\\": {\\n \\\"tooltip\\\": {\\\"signal\\\": \\\"{'Block propagation': datum.distance}\\\"}\\n },\\n \\\"update\\\": {\\n \\\"stroke\\\": {\\\"value\\\": \\\"#ccc\\\"},\\n \\\"strokeWidth\\\": {\\\"value\\\": 1.5}\\n },\\n \\\"hover\\\": { \\\"stroke\\\": { \\\"value\\\": \\\"#0B33A0\\\" }}\\n },\\n \\\"transform\\\": [\\n {\\n \\\"type\\\": \\\"linkpath\\\",\\n \\\"require\\\": {\\\"signal\\\": \\\"force\\\"},\\n \\\"shape\\\": \\\"line\\\",\\n \\\"sourceX\\\": \\\"datum.source.x\\\", \\\"sourceY\\\": \\\"datum.source.y\\\",\\n \\\"targetX\\\": \\\"datum.target.x\\\", \\\"targetY\\\": \\\"datum.target.y\\\"\\n }\\n ]\\n }\\n ]\\n}\\n\"},\"uiState\":{},\"data\":{\"aggs\":[],\"searchSource\":{\"query\":{\"language\":\"kuery\",\"query\":\"\"},\"filter\":[]}}},\"enhancements\":{}}}]","timeRestore":false,"title":"Peer block propagation","version":1},"coreMigrationVersion":"7.14.1","id":"937b1470-212b-11ec-99f4-75d57f0cd0d8","migrationVersion":{"dashboard":"7.14.0"},"references":[],"type":"dashboard","updated_at":"2021-09-29T13:45:58.342Z","version":"Wzg4NzMsMV0="} +{"excludedObjects":[],"excludedObjectsCount":0,"exportedCount":1,"missingRefCount":0,"missingReferences":[]} diff --git a/tools/kibana/index-template.json b/tools/kibana/index-template.json new file mode 100644 index 000000000..f08298ad1 --- /dev/null +++ b/tools/kibana/index-template.json @@ -0,0 +1,66 @@ +{ + "template": { + "settings": {}, + "mappings": { + "runtime": { + "peerScore.weightedScore": { + "type": "double", + "script": { + "source": "if (doc['type'].value == 100) {\n def score = doc['peerScore.score'].value;\n if (doc['sourceAuth'] == \"\") {\n\n emit(score * 1.2)\n } else {\n emit(score)\n }\n}\n", + "lang": "painless" + } + } + }, + "properties": { + "peerScore": { + "properties": { + "appSpecificScore": { + "type": "double" + }, + "behaviourPenalty": { + "type": "double" + }, + "ipColocationFactor": { + "type": "double" + }, + "score": { + "type": "double" + }, + "topics": { + "type": "nested", + "properties": { + "firstMessageDeliveries": { + "type": "double", + "ignore_malformed": false, + "coerce": true + }, + "invalidMessageDeliveries": { + "type": "double", + "ignore_malformed": false, + "coerce": true + }, + "meshMessageDeliveries": { + "type": "double", + "ignore_malformed": false, + "coerce": true + }, + "timeInMesh": { + "type": "double", + "ignore_malformed": false, + "coerce": true + }, + "topic": { + "type": "keyword" + } + } + } + } + }, + "sourceAuth": { + "type": "keyword" + } + } + }, + "aliases": {} + } +} diff --git a/tools/kibana/peer-scores-dashboard.ndjson b/tools/kibana/peer-scores-dashboard.ndjson new file mode 100644 index 000000000..4df376ab3 --- /dev/null +++ b/tools/kibana/peer-scores-dashboard.ndjson @@ -0,0 +1,2 @@ +{"attributes":{"description":"Average peer score table per node peerID","hits":0,"kibanaSavedObjectMeta":{"searchSourceJSON":"{\"query\":{\"query\":\"\",\"language\":\"kuery\"},\"filter\":[]}"},"optionsJSON":"{\"useMargins\":true,\"syncColors\":false,\"hidePanelTitles\":false}","panelsJSON":"[{\"version\":\"7.14.1\",\"type\":\"lens\",\"gridData\":{\"x\":0,\"y\":0,\"w\":48,\"h\":43,\"i\":\"cddc98a5-45f3-4ba7-a7c6-6b9d216b19da\"},\"panelIndex\":\"cddc98a5-45f3-4ba7-a7c6-6b9d216b19da\",\"embeddableConfig\":{\"attributes\":{\"title\":\"\",\"type\":\"lens\",\"visualizationType\":\"lnsDatatable\",\"state\":{\"datasourceStates\":{\"indexpattern\":{\"layers\":{\"666e2f39-8868-45ad-b747-fe124830b0ae\":{\"columns\":{\"504c50bd-14c1-4119-820e-c961866fc3b4\":{\"label\":\"peerID\",\"dataType\":\"string\",\"operationType\":\"terms\",\"scale\":\"ordinal\",\"sourceField\":\"peerScore.peerID.keyword\",\"isBucketed\":true,\"params\":{\"size\":100,\"orderBy\":{\"type\":\"column\",\"columnId\":\"ec82c5f7-b3c4-4715-8646-a8fe584400fc\"},\"orderDirection\":\"desc\",\"otherBucket\":false,\"missingBucket\":false},\"customLabel\":true},\"ec82c5f7-b3c4-4715-8646-a8fe584400fc\":{\"label\":\"Score\",\"dataType\":\"number\",\"operationType\":\"average\",\"sourceField\":\"peerScore.score\",\"isBucketed\":false,\"scale\":\"ratio\",\"customLabel\":true},\"e22a19e8-1d71-43d6-9ca0-b30ddd423447\":{\"label\":\"Weighted Score\",\"dataType\":\"number\",\"operationType\":\"average\",\"sourceField\":\"peerScore.weightedScore\",\"isBucketed\":false,\"scale\":\"ratio\",\"customLabel\":true}},\"columnOrder\":[\"504c50bd-14c1-4119-820e-c961866fc3b4\",\"ec82c5f7-b3c4-4715-8646-a8fe584400fc\",\"e22a19e8-1d71-43d6-9ca0-b30ddd423447\"],\"incompleteColumns\":{}}}}},\"visualization\":{\"columns\":[{\"isTransposed\":false,\"columnId\":\"504c50bd-14c1-4119-820e-c961866fc3b4\"},{\"isTransposed\":false,\"columnId\":\"ec82c5f7-b3c4-4715-8646-a8fe584400fc\",\"colorMode\":\"cell\",\"palette\":{\"type\":\"palette\",\"name\":\"positive\",\"params\":{\"stops\":[{\"color\":\"#d6e9e4\",\"stop\":20},{\"color\":\"#aed3ca\",\"stop\":40},{\"color\":\"#85bdb1\",\"stop\":60},{\"color\":\"#5aa898\",\"stop\":80},{\"color\":\"#209280\",\"stop\":100}]}}},{\"columnId\":\"e22a19e8-1d71-43d6-9ca0-b30ddd423447\",\"isTransposed\":false,\"colorMode\":\"cell\",\"palette\":{\"type\":\"palette\",\"name\":\"positive\",\"params\":{\"stops\":[{\"color\":\"#d6e9e4\",\"stop\":20},{\"color\":\"#aed3ca\",\"stop\":40},{\"color\":\"#85bdb1\",\"stop\":60},{\"color\":\"#5aa898\",\"stop\":80},{\"color\":\"#209280\",\"stop\":100}]}}}],\"layerId\":\"666e2f39-8868-45ad-b747-fe124830b0ae\"},\"query\":{\"query\":\"\",\"language\":\"kuery\"},\"filters\":[]},\"references\":[{\"type\":\"index-pattern\",\"id\":\"9890c040-17b7-11ec-99f4-75d57f0cd0d8\",\"name\":\"indexpattern-datasource-current-indexpattern\"},{\"type\":\"index-pattern\",\"id\":\"2c407db0-1acb-11ec-99f4-75d57f0cd0d8\",\"name\":\"indexpattern-datasource-layer-666e2f39-8868-45ad-b747-fe124830b0ae\"}]},\"hidePanelTitles\":false,\"enhancements\":{}},\"title\":\"Peer Scores\"}]","timeRestore":false,"title":"Peer Scores","version":1},"coreMigrationVersion":"7.14.1","id":"e7e4fd70-1acb-11ec-99f4-75d57f0cd0d8","migrationVersion":{"dashboard":"7.14.0"},"references":[{"id":"2c407db0-1acb-11ec-99f4-75d57f0cd0d8","name":"cddc98a5-45f3-4ba7-a7c6-6b9d216b19da:indexpattern-datasource-current-indexpattern","type":"index-pattern"},{"id":"2c407db0-1acb-11ec-99f4-75d57f0cd0d8","name":"cddc98a5-45f3-4ba7-a7c6-6b9d216b19da:indexpattern-datasource-layer-666e2f39-8868-45ad-b747-fe124830b0ae","type":"index-pattern"}],"type":"dashboard","updated_at":"2021-09-24T12:06:00.625Z","version":"WzczNDgsMV0="} +{"excludedObjects":[],"excludedObjectsCount":0,"exportedCount":1,"missingRefCount":0,"missingReferences":[]}