Merge pull request #6 from ChainSafe/mpetrun5/tracer-json-transport

Json tracer transport
This commit is contained in:
mace 2021-09-15 14:25:00 +02:00 committed by GitHub
commit d2702209a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 112 additions and 14 deletions

View File

@ -525,6 +525,12 @@ Type: Array of multiaddress peerinfo strings, must include peerid (/p2p/12D3K...
Name: "RemoteTracer",
Type: "string",
Comment: ``,
},
{
Name: "JsonTracerFile",
Type: "string",
Comment: ``,
},
},

View File

@ -309,6 +309,7 @@ type Pubsub struct {
DirectPeers []string
IPColocationWhitelist []string
RemoteTracer string
JsonTracerFile string
}
type Chainstore struct {

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"net"
"os"
"time"
host "github.com/libp2p/go-libp2p-core/host"
@ -21,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/modules/tracer"
)
func init() {
@ -55,10 +57,10 @@ type PeerScoreTracker interface {
type peerScoreTracker struct {
sk *dtypes.ScoreKeeper
lt LotusTracer
lt tracer.LotusTracer
}
func newPeerScoreTracker(lt LotusTracer, sk *dtypes.ScoreKeeper) PeerScoreTracker {
func newPeerScoreTracker(lt tracer.LotusTracer, sk *dtypes.ScoreKeeper) PeerScoreTracker {
return &peerScoreTracker{
sk: sk,
lt: lt,
@ -67,7 +69,7 @@ func newPeerScoreTracker(lt LotusTracer, sk *dtypes.ScoreKeeper) PeerScoreTracke
func (pst *peerScoreTracker) UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot) {
if pst.lt != nil {
pst.lt.TracePeerScore(scores)
pst.lt.PeerScores(scores)
}
pst.sk.Update(scores)
@ -364,6 +366,17 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
pubsub.NewAllowlistSubscriptionFilter(allowTopics...),
100)))
var lt tracer.LotusTracer
if in.Cfg.JsonTracerFile != "" {
out, err := os.OpenFile(in.Cfg.JsonTracerFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0660)
if err != nil {
return nil, err
}
jsonTransport := tracer.NewJsonTracerTransport(out)
lt = tracer.NewLotusTracer(jsonTransport, in.Host.ID())
}
// tracer
if in.Cfg.RemoteTracer != "" {
a, err := ma.NewMultiaddr(in.Cfg.RemoteTracer)
@ -381,7 +394,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
return nil, err
}
lt := newLotusTracer(tr)
pst := newPeerScoreTracker(lt, in.Sk)
trw := newTracerWrapper(tr, lt, build.BlocksTopic(in.Nn))

View File

@ -0,0 +1,20 @@
package tracer
import (
"os"
)
type jsonTracerTransport struct {
out *os.File
}
func NewJsonTracerTransport(out *os.File) TracerTransport {
return &jsonTracerTransport{
out: out,
}
}
func (jtt *jsonTracerTransport) Transport(jsonEvent []byte) error {
_, err := jtt.out.Write(jsonEvent)
return err
}

View File

@ -1,28 +1,87 @@
package tracer
import (
peer "github.com/libp2p/go-libp2p-core/peer"
"encoding/json"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
)
func newLotusTracer(et pubsub.EventTracer, tt TracerTransport) LotusTracer {
var log = logging.Logger("lotus-tracer")
func NewLotusTracer(tt TracerTransport, pid peer.ID) LotusTracer {
return &lotusTracer{
et: et,
tt: tt,
tt: tt,
pid: pid,
}
}
type lotusTracer struct {
et pubsub.EventTracer
tt TracerTransport
tt TracerTransport
pid peer.ID
}
const (
TraceEvent_PEER_SCORES pubsub_pb.TraceEvent_Type = 100
)
type LotusTraceEvent struct {
Type pubsub_pb.TraceEvent_Type `json:"type,omitempty"`
PeerID []byte `json:"peerID,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
PeerScores *TraceEvent_PeerScores `json:"peerScores,omitempty"`
}
type TraceEvent_PeerScores struct {
Scores map[peer.ID]*pubsub.PeerScoreSnapshot `json:"scores,omitempty"`
}
type LotusTracer interface {
TracePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot)
Trace(evt *pubsub_pb.TraceEvent)
TraceLotusEvent(evt *LotusTraceEvent)
PeerScores(scores map[peer.ID]*pubsub.PeerScoreSnapshot)
}
func (lt *lotusTracer) TracePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot) {}
func (lt *lotusTracer) PeerScores(scores map[peer.ID]*pubsub.PeerScoreSnapshot) {
now := time.Now().UnixNano()
evt := &LotusTraceEvent{
Type: *TraceEvent_PEER_SCORES.Enum(),
PeerID: []byte(lt.pid),
Timestamp: &now,
PeerScores: &TraceEvent_PeerScores{
Scores: scores,
},
}
func (lt *lotusTracer) Trace(evt *pubsub_pb.TraceEvent) {}
lt.TraceLotusEvent(evt)
}
func (lt *lotusTracer) TraceLotusEvent(evt *LotusTraceEvent) {
jsonEvent, err := json.Marshal(evt)
if err != nil {
log.Errorf("error while marshaling peer score: %s", err)
return
}
err = lt.tt.Transport(jsonEvent)
if err != nil {
log.Errorf("error while transporting peer scores: %s", err)
}
}
func (lt *lotusTracer) Trace(evt *pubsub_pb.TraceEvent) {
jsonEvent, err := json.Marshal(evt)
if err != nil {
log.Errorf("error while marshaling tracer event: %s", err)
return
}
err = lt.tt.Transport(jsonEvent)
if err != nil {
log.Errorf("error while transporting trace event: %s", err)
}
}

View File

@ -1,5 +1,5 @@
package tracer
type TracerTransport interface {
Transport(jsonEvent []byte)
Transport(jsonEvent []byte) error
}