lotus/node/modules/tracer/tracer.go

120 lines
3.3 KiB
Go
Raw Normal View History

2021-09-10 10:36:38 +00:00
package tracer
2021-08-30 12:35:15 +00:00
import (
"time"
2021-09-10 13:37:38 +00:00
logging "github.com/ipfs/go-log/v2"
2021-08-30 12:35:15 +00:00
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
2022-12-09 20:20:51 +00:00
"github.com/libp2p/go-libp2p/core/peer"
2021-08-30 12:35:15 +00:00
)
2021-09-10 13:37:38 +00:00
var log = logging.Logger("lotus-tracer")
2021-09-16 09:51:59 +00:00
func NewLotusTracer(tt []TracerTransport, pid peer.ID, sourceAuth string) LotusTracer {
2021-08-30 12:35:15 +00:00
return &lotusTracer{
tt: tt,
pid: pid,
2021-09-16 09:51:59 +00:00
sa: sourceAuth,
2021-08-30 12:35:15 +00:00
}
}
type lotusTracer struct {
2021-09-15 12:50:27 +00:00
tt []TracerTransport
pid peer.ID
2021-09-16 09:51:59 +00:00
sa string
}
const (
2021-09-29 11:12:42 +00:00
TraceEventPeerScores pubsub_pb.TraceEvent_Type = 100
)
type LotusTraceEvent struct {
Type pubsub_pb.TraceEvent_Type `json:"type,omitempty"`
PeerID []byte `json:"peerID,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
2021-09-29 11:12:42 +00:00
PeerScore TraceEventPeerScore `json:"peerScore,omitempty"`
2021-09-16 09:51:59 +00:00
SourceAuth string `json:"sourceAuth,omitempty"`
}
type TopicScore struct {
Topic string `json:"topic"`
TimeInMesh time.Duration `json:"timeInMesh"`
FirstMessageDeliveries float64 `json:"firstMessageDeliveries"`
MeshMessageDeliveries float64 `json:"meshMessageDeliveries"`
InvalidMessageDeliveries float64 `json:"invalidMessageDeliveries"`
}
2021-09-29 11:12:42 +00:00
type TraceEventPeerScore struct {
PeerID []byte `json:"peerID"`
Score float64 `json:"score"`
AppSpecificScore float64 `json:"appSpecificScore"`
IPColocationFactor float64 `json:"ipColocationFactor"`
BehaviourPenalty float64 `json:"behaviourPenalty"`
Topics []TopicScore `json:"topics"`
2021-08-30 12:35:15 +00:00
}
type LotusTracer interface {
Trace(evt *pubsub_pb.TraceEvent)
TraceLotusEvent(evt *LotusTraceEvent)
PeerScores(scores map[peer.ID]*pubsub.PeerScoreSnapshot)
}
func (lt *lotusTracer) PeerScores(scores map[peer.ID]*pubsub.PeerScoreSnapshot) {
now := time.Now().UnixNano()
for pid, score := range scores {
var topics []TopicScore
for topic, snapshot := range score.Topics {
topics = append(topics, TopicScore{
Topic: topic,
TimeInMesh: snapshot.TimeInMesh,
FirstMessageDeliveries: snapshot.FirstMessageDeliveries,
MeshMessageDeliveries: snapshot.MeshMessageDeliveries,
InvalidMessageDeliveries: snapshot.InvalidMessageDeliveries,
})
}
evt := &LotusTraceEvent{
2021-09-29 11:12:42 +00:00
Type: *TraceEventPeerScores.Enum(),
Timestamp: &now,
2023-02-23 13:02:20 +00:00
PeerID: []byte(lt.pid),
SourceAuth: lt.sa,
PeerScore: TraceEventPeerScore{
PeerID: []byte(pid),
Score: score.Score,
AppSpecificScore: score.AppSpecificScore,
IPColocationFactor: score.IPColocationFactor,
BehaviourPenalty: score.BehaviourPenalty,
Topics: topics,
},
}
lt.TraceLotusEvent(evt)
}
2021-08-30 12:35:15 +00:00
}
func (lt *lotusTracer) TraceLotusEvent(evt *LotusTraceEvent) {
2021-09-15 12:50:27 +00:00
for _, t := range lt.tt {
err := t.Transport(TracerTransportEvent{
lotusTraceEvent: evt,
pubsubTraceEvent: nil,
})
2021-09-15 12:50:27 +00:00
if err != nil {
log.Errorf("error while transporting peer scores: %s", err)
}
2021-09-10 13:37:38 +00:00
}
}
func (lt *lotusTracer) Trace(evt *pubsub_pb.TraceEvent) {
2021-09-15 12:50:27 +00:00
for _, t := range lt.tt {
err := t.Transport(TracerTransportEvent{
lotusTraceEvent: nil,
pubsubTraceEvent: evt,
})
2021-09-15 12:50:27 +00:00
if err != nil {
log.Errorf("error while transporting trace event: %s", err)
}
2021-09-10 13:37:38 +00:00
}
}