diff --git a/node/modules/tracer/elasticsearch_transport.go b/node/modules/tracer/elasticsearch_transport.go index 8e214a4e9..f9cf621eb 100644 --- a/node/modules/tracer/elasticsearch_transport.go +++ b/node/modules/tracer/elasticsearch_transport.go @@ -12,6 +12,9 @@ import ( const ( ElasticSearch_INDEX = "pubsub" + + ElasticSearch_DOC_LOTUS = "doc_lotus" + ElasticSearch_DOC_PUBSUB = "doc_pubsub" ) func NewElasticSearchTransport() (TracerTransport, error) { @@ -30,10 +33,27 @@ type elasticSearchTransport struct { cl *elasticsearch.Client } -func (est *elasticSearchTransport) Transport(jsonEvent []byte) error { +func (est *elasticSearchTransport) Transport(event TracerTransportEvent) error { + var e interface{} + var docId string + if event.lotusTraceEvent != nil { + e = *event.lotusTraceEvent + docId = ElasticSearch_DOC_LOTUS + } else if event.pubsubTraceEvent != nil { + e = *event.pubsubTraceEvent + docId = ElasticSearch_DOC_PUBSUB + } else { + return nil + } + + jsonEvent, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("error while marshaling peer score: %s", err) + } + req := esapi.IndexRequest{ Index: ElasticSearch_INDEX, - DocumentID: "1", // todo + DocumentID: docId, Body: strings.NewReader(string(jsonEvent)), Refresh: "true", } diff --git a/node/modules/tracer/json_transport.go b/node/modules/tracer/json_transport.go index 2ab004d9c..128c37473 100644 --- a/node/modules/tracer/json_transport.go +++ b/node/modules/tracer/json_transport.go @@ -1,6 +1,8 @@ package tracer import ( + "encoding/json" + "fmt" "os" ) @@ -14,7 +16,21 @@ func NewJsonTracerTransport(out *os.File) TracerTransport { } } -func (jtt *jsonTracerTransport) Transport(jsonEvent []byte) error { - _, err := jtt.out.Write(jsonEvent) +func (jtt *jsonTracerTransport) Transport(event TracerTransportEvent) error { + var e interface{} + if event.lotusTraceEvent != nil { + e = *event.lotusTraceEvent + } else if event.pubsubTraceEvent != nil { + e = *event.pubsubTraceEvent + } else { + return nil + } + + jsonEvent, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("error while marshaling event: %s", err) + } + + _, err = jtt.out.Write(jsonEvent) return err } diff --git a/node/modules/tracer/tracer.go b/node/modules/tracer/tracer.go index 8c6250151..ad5f5c6b7 100644 --- a/node/modules/tracer/tracer.go +++ b/node/modules/tracer/tracer.go @@ -1,7 +1,6 @@ package tracer import ( - "encoding/json" "time" logging "github.com/ipfs/go-log/v2" @@ -61,14 +60,11 @@ func (lt *lotusTracer) PeerScores(scores map[peer.ID]*pubsub.PeerScoreSnapshot) } func (lt *lotusTracer) TraceLotusEvent(evt *LotusTraceEvent) { - jsonEvent, err := json.Marshal(evt) - if err != nil { - log.Errorf("error while marshaling peer score: %s", err) - return - } - for _, t := range lt.tt { - err = t.Transport(jsonEvent) + err := t.Transport(TracerTransportEvent{ + lotusTraceEvent: evt, + pubsubTraceEvent: nil, + }) if err != nil { log.Errorf("error while transporting peer scores: %s", err) } @@ -77,14 +73,11 @@ func (lt *lotusTracer) TraceLotusEvent(evt *LotusTraceEvent) { } func (lt *lotusTracer) Trace(evt *pubsub_pb.TraceEvent) { - jsonEvent, err := json.Marshal(evt) - if err != nil { - log.Errorf("error while marshaling tracer event: %s", err) - return - } - for _, t := range lt.tt { - err = t.Transport(jsonEvent) + err := t.Transport(TracerTransportEvent{ + lotusTraceEvent: nil, + pubsubTraceEvent: evt, + }) if err != nil { log.Errorf("error while transporting trace event: %s", err) } diff --git a/node/modules/tracer/transport.go b/node/modules/tracer/transport.go index c8495821e..e67867fd7 100644 --- a/node/modules/tracer/transport.go +++ b/node/modules/tracer/transport.go @@ -1,5 +1,12 @@ package tracer +import pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" + type TracerTransport interface { - Transport(jsonEvent []byte) error + Transport(jsonEvent TracerTransportEvent) error +} + +type TracerTransportEvent struct { + lotusTraceEvent *LotusTraceEvent + pubsubTraceEvent *pubsub_pb.TraceEvent }