From c2e2725855b4c6f47c471700025983df476ea608 Mon Sep 17 00:00:00 2001 From: cortze Date: Mon, 24 Apr 2023 17:34:56 +0200 Subject: [PATCH 1/6] update traces, es transport with batches and fasthttp --- go.mod | 2 + go.sum | 4 + node/modules/lp2p/pubsub.go | 204 ++++++++++++++---- .../modules/tracer/elasticsearch_transport.go | 65 +++--- node/modules/tracer/fasthttp_transport.go | 74 +++++++ 5 files changed, 276 insertions(+), 73 deletions(-) create mode 100644 node/modules/tracer/fasthttp_transport.go diff --git a/go.mod b/go.mod index f97f57fc3..1b091e095 100644 --- a/go.mod +++ b/go.mod @@ -144,6 +144,7 @@ require ( github.com/stretchr/testify v1.8.2 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/urfave/cli/v2 v2.16.3 + github.com/valyala/fasthttp v1.45.0 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa github.com/whyrusleeping/ledger-filecoin-go v0.9.1-0.20201010031517-c3dcc1bddce4 @@ -179,6 +180,7 @@ require ( github.com/StackExchange/wmi v1.2.1 // indirect github.com/akavel/rsrc v0.8.0 // indirect github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect + github.com/andybalholm/brotli v1.0.5 // indirect github.com/armon/go-metrics v0.3.9 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index 9665d04be..4b4dd2a9f 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= +github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= +github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -1634,6 +1636,8 @@ github.com/urfave/cli/v2 v2.16.3 h1:gHoFIwpPjoyIMbJp/VFd+/vuD0dAgFK4B6DpEMFJfQk= github.com/urfave/cli/v2 v2.16.3/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.45.0 h1:zPkkzpIn8tdHZUrVa6PzYd0i5verqiPSkgTd3bSUcpA= +github.com/valyala/fasthttp v1.45.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 5376d93c8..038c129ac 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -381,9 +381,9 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { allowTopics = append(allowTopics, drandTopics...) options = append(options, pubsub.WithSubscriptionFilter( - pubsub.WrapLimitSubscriptionFilter( - pubsub.NewAllowlistSubscriptionFilter(allowTopics...), - 100))) + pubsub.WrapLimitSubscriptionFilter( + pubsub.NewAllowlistSubscriptionFilter(allowTopics...), + 100))) var transports []tracer.TracerTransport if in.Cfg.JsonTracer != "" { @@ -394,6 +394,20 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { transports = append(transports, jsonTransport) } + + tps := make([]string, 0) // range of topics that will be submited to the traces + addTopicToList := func(topicList []string, newTopic string) []string { + // check if the topic is already in the list + for _, tp := range topicList { + if tp == newTopic { + return topicList + } + } + // add it otherwise + return append(topicList, newTopic) + } + + // tracer if in.Cfg.ElasticSearchTracer != "" { elasticSearchTransport, err := tracer.NewElasticSearchTransport( in.Cfg.ElasticSearchTracer, @@ -403,7 +417,9 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { return nil, err } transports = append(transports, elasticSearchTransport) + tps = addTopicToList(tps, build.BlocksTopic(in.Nn)) } + lt := tracer.NewLotusTracer(transports, in.Host.ID(), in.Cfg.TracerSourceAuth) // tracer @@ -412,28 +428,25 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { if err != nil { return nil, err } - pi, err := peer.AddrInfoFromP2pAddr(a) if err != nil { return nil, err } - tr, err := pubsub.NewRemoteTracer(context.TODO(), in.Host, *pi) if err != nil { return nil, err } - pst := newPeerScoreTracker(lt, in.Sk) - trw := newTracerWrapper(tr, lt, build.BlocksTopic(in.Nn)) + trw := newTracerWrapper(tr, lt, tps...) options = append(options, pubsub.WithEventTracer(trw)) options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second)) } else { // still instantiate a tracer for collecting metrics - trw := newTracerWrapper(nil, lt) - options = append(options, pubsub.WithEventTracer(trw)) - pst := newPeerScoreTracker(lt, in.Sk) + trw := newTracerWrapper(nil, lt, tps...) + + options = append(options, pubsub.WithEventTracer(trw)) options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second)) } @@ -457,7 +470,6 @@ func newTracerWrapper( topicsMap[topic] = struct{}{} } } - return &tracerWrapper{lp2pTracer: lp2pTracer, lotusTracer: lotusTracer, topics: topicsMap} } @@ -486,71 +498,169 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) { if trw.lp2pTracer != nil { trw.lp2pTracer.Trace(evt) } - if trw.lotusTracer != nil { trw.lotusTracer.Trace(evt) } } + case pubsub_pb.TraceEvent_DELIVER_MESSAGE: stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1)) - if trw.traceMessage(evt.GetDeliverMessage().GetTopic()) { - if trw.lp2pTracer != nil { - trw.lp2pTracer.Trace(evt) - } - - if trw.lotusTracer != nil { - trw.lotusTracer.Trace(evt) - } - } + case pubsub_pb.TraceEvent_REJECT_MESSAGE: stats.Record(context.TODO(), metrics.PubsubRejectMessage.M(1)) if trw.traceMessage(evt.GetRejectMessage().GetTopic()) { if trw.lp2pTracer != nil { trw.lp2pTracer.Trace(evt) } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } + + case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE: + stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1)) + if trw.traceMessage(evt.GetDuplicateMessage().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } + + case pubsub_pb.TraceEvent_JOIN: + if trw.traceMessage(evt.GetJoin().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } + case pubsub_pb.TraceEvent_LEAVE: + if trw.traceMessage(evt.GetLeave().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } + + case pubsub_pb.TraceEvent_GRAFT: + if trw.traceMessage(evt.GetGraft().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } if trw.lotusTracer != nil { trw.lotusTracer.Trace(evt) } } - case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE: - stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1)) - case pubsub_pb.TraceEvent_JOIN: - if trw.lp2pTracer != nil { - trw.lp2pTracer.Trace(evt) - } - if trw.lotusTracer != nil { - trw.lotusTracer.Trace(evt) - } - case pubsub_pb.TraceEvent_LEAVE: - if trw.lp2pTracer != nil { - trw.lp2pTracer.Trace(evt) - } - - if trw.lotusTracer != nil { - trw.lotusTracer.Trace(evt) - } - case pubsub_pb.TraceEvent_GRAFT: - if trw.lp2pTracer != nil { - trw.lp2pTracer.Trace(evt) - } - - if trw.lotusTracer != nil { - trw.lotusTracer.Trace(evt) - } case pubsub_pb.TraceEvent_PRUNE: + if trw.traceMessage(evt.GetPrune().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } + + case pubsub_pb.TraceEvent_ADD_PEER: if trw.lp2pTracer != nil { trw.lp2pTracer.Trace(evt) } - if trw.lotusTracer != nil { trw.lotusTracer.Trace(evt) } + + case pubsub_pb.TraceEvent_REMOVE_PEER: + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + case pubsub_pb.TraceEvent_RECV_RPC: stats.Record(context.TODO(), metrics.PubsubRecvRPC.M(1)) + // only track the RPC Calls from IWANT / IHAVE / BLOCK topic + controlRPC := evt.GetRecvRPC().GetMeta().GetControl() + ihave := controlRPC.GetIhave() + iwant := controlRPC.GetIwant() + msgsRPC := evt.GetRecvRPC().GetMeta().GetMessages() + + // check if any of the messages we are sending belong to a trackable topic + var validTopic bool = false + for _, topic := range msgsRPC { + if trw.traceMessage(topic.GetTopic()) { + validTopic = true + break + } + } + // track if the Iwant / Ihave messages are from a valid Topic + var validIhave bool = false + for _, msgs := range ihave { + if trw.traceMessage(msgs.GetTopic()) { + validIhave = true + break + } + } + // check if we have any of iwant msgs (it doesn't classify per topic - just msg.ID) + var validIwant bool = false + if len(iwant) > 0 { + validIwant = true + } + + // trace the event if any of the flags was triggered + if validIhave || validIwant || validTopic { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } case pubsub_pb.TraceEvent_SEND_RPC: stats.Record(context.TODO(), metrics.PubsubSendRPC.M(1)) + // only track the RPC Calls from IWANT / IHAVE / BLOCK topic + controlRPC := evt.GetSendRPC().GetMeta().GetControl() + ihave := controlRPC.GetIhave() + iwant := controlRPC.GetIwant() + msgsRPC := evt.GetSendRPC().GetMeta().GetMessages() + + // check if any of the messages we are sending belong to a trackable topic + var validTopic bool = false + for _, topic := range msgsRPC { + if trw.traceMessage(topic.GetTopic()) { + validTopic = true + break + } + } + // track if the Iwant / Ihave messages are from a valid Topic + var validIhave bool = false + for _, msgs := range ihave { + if trw.traceMessage(msgs.GetTopic()) { + validIhave = true + break + } + } + // check if there was any of the Iwant msgs + var validIwant bool = false + if len(iwant) > 0 { + validIwant = true + } + // trace the msgs if any of the flags was triggered + if validIhave || validIwant || validTopic { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } case pubsub_pb.TraceEvent_DROP_RPC: stats.Record(context.TODO(), metrics.PubsubDropRPC.M(1)) } diff --git a/node/modules/tracer/elasticsearch_transport.go b/node/modules/tracer/elasticsearch_transport.go index 1f6f9a157..3cc50b388 100644 --- a/node/modules/tracer/elasticsearch_transport.go +++ b/node/modules/tracer/elasticsearch_transport.go @@ -4,15 +4,19 @@ import ( "context" "encoding/json" "fmt" + "bytes" "net/url" - "strings" + "time" "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/elastic/go-elasticsearch/v7/esutil" ) const ( ElasticSearchDefaultIndex = "lotus-pubsub" + flushInterval = 10 * time.Second + flushBytes = 1024 * 1024 // MB + esWorkers = 2 // TODO: hardcoded ) func NewElasticSearchTransport(connectionString string, elasticsearchIndex string) (TracerTransport, error) { @@ -30,10 +34,10 @@ func NewElasticSearchTransport(connectionString string, elasticsearchIndex strin }, Username: username, Password: password, + Transport: &FastHttpTransport{}, } es, err := elasticsearch.NewClient(cfg) - if err != nil { return nil, err } @@ -45,14 +49,31 @@ func NewElasticSearchTransport(connectionString string, elasticsearchIndex strin esIndex = ElasticSearchDefaultIndex } + // Create the BulkIndexer to batch ES trace submission + bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Index: esIndex, + Client: es, + NumWorkers: esWorkers, + FlushBytes: int(flushBytes), + FlushInterval: flushInterval, + OnError: func(ctx context.Context, err error) { + log.Errorf("Error persisting queries %s", err.Error()) + }, + }) + if err != nil { + return nil, err + } + return &elasticSearchTransport{ - cl: es, + cl: es, + bi: bi, esIndex: esIndex, }, nil } type elasticSearchTransport struct { cl *elasticsearch.Client + bi esutil.BulkIndexer esIndex string } @@ -72,26 +93,18 @@ func (est *elasticSearchTransport) Transport(evt TracerTransportEvent) error { return fmt.Errorf("error while marshaling event: %s", err) } - req := esapi.IndexRequest{ - Index: est.esIndex, - Body: strings.NewReader(string(jsonEvt)), - Refresh: "true", - } - - // Perform the request with the client. - res, err := req.Do(context.Background(), est.cl) - if err != nil { - return err - } - - err = res.Body.Close() - if err != nil { - return err - } - - if res.IsError() { - return fmt.Errorf("[%s] Error indexing document ID=%s", res.Status(), req.DocumentID) - } - - return nil + return est.bi.Add( + context.Background(), + esutil.BulkIndexerItem{ + Action: "index", + Body: bytes.NewReader(jsonEvt), + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + if err != nil { + log.Errorf("unable to submit trace - %s", err) + } else { + log.Errorf("unable to submit trace %s: %s", res.Error.Type, res.Error.Reason) + } + }, + }, + ) } diff --git a/node/modules/tracer/fasthttp_transport.go b/node/modules/tracer/fasthttp_transport.go new file mode 100644 index 000000000..b17bd8132 --- /dev/null +++ b/node/modules/tracer/fasthttp_transport.go @@ -0,0 +1,74 @@ +package tracer + +import ( + "io/ioutil" + "net/http" + "strings" + + "github.com/valyala/fasthttp" +) + +// Transport implements the elastictransport interface with +// the github.com/valyala/fasthttp HTTP client. +type FastHttpTransport struct{} + +// RoundTrip performs the request and returns a response or error +func (t *FastHttpTransport) RoundTrip(req *http.Request) (*http.Response, error) { + freq := fasthttp.AcquireRequest() + defer fasthttp.ReleaseRequest(freq) + + fres := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(fres) + + t.copyRequest(freq, req) + + err := fasthttp.Do(freq, fres) + if err != nil { + return nil, err + } + + res := &http.Response{Header: make(http.Header)} + t.copyResponse(res, fres) + + return res, nil +} + +// copyRequest converts a http.Request to fasthttp.Request +func (t *FastHttpTransport) copyRequest(dst *fasthttp.Request, src *http.Request) *fasthttp.Request { + if src.Method == "GET" && src.Body != nil { + src.Method = "POST" + } + + dst.SetHost(src.Host) + dst.SetRequestURI(src.URL.String()) + + dst.Header.SetRequestURI(src.URL.String()) + dst.Header.SetMethod(src.Method) + + for k, vv := range src.Header { + for _, v := range vv { + dst.Header.Set(k, v) + } + } + + if src.Body != nil { + dst.SetBodyStream(src.Body, -1) + } + + return dst +} + +// copyResponse converts a http.Response to fasthttp.Response +func (t *FastHttpTransport) copyResponse(dst *http.Response, src *fasthttp.Response) *http.Response { + dst.StatusCode = src.StatusCode() + + src.Header.VisitAll(func(k, v []byte) { + dst.Header.Set(string(k), string(v)) + }) + + // Cast to a string to make a copy seeing as src.Body() won't + // be valid after the response is released back to the pool (fasthttp.ReleaseResponse). + dst.Body = ioutil.NopCloser(strings.NewReader(string(src.Body()))) + + return dst +} From d400d33b1e7c87e36956f24a65010d3f40cd8cc6 Mon Sep 17 00:00:00 2001 From: cortze Date: Wed, 17 May 2023 14:44:19 +0200 Subject: [PATCH 2/6] go format code changes --- node/modules/lp2p/pubsub.go | 8 ++++---- node/modules/tracer/fasthttp_transport.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 038c129ac..fcee63192 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -381,9 +381,9 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { allowTopics = append(allowTopics, drandTopics...) options = append(options, pubsub.WithSubscriptionFilter( - pubsub.WrapLimitSubscriptionFilter( - pubsub.NewAllowlistSubscriptionFilter(allowTopics...), - 100))) + pubsub.WrapLimitSubscriptionFilter( + pubsub.NewAllowlistSubscriptionFilter(allowTopics...), + 100))) var transports []tracer.TracerTransport if in.Cfg.JsonTracer != "" { @@ -505,7 +505,7 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) { case pubsub_pb.TraceEvent_DELIVER_MESSAGE: stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1)) - + case pubsub_pb.TraceEvent_REJECT_MESSAGE: stats.Record(context.TODO(), metrics.PubsubRejectMessage.M(1)) if trw.traceMessage(evt.GetRejectMessage().GetTopic()) { diff --git a/node/modules/tracer/fasthttp_transport.go b/node/modules/tracer/fasthttp_transport.go index b17bd8132..e7e3b8331 100644 --- a/node/modules/tracer/fasthttp_transport.go +++ b/node/modules/tracer/fasthttp_transport.go @@ -1,7 +1,7 @@ package tracer import ( - "io/ioutil" + "io" "net/http" "strings" @@ -68,7 +68,7 @@ func (t *FastHttpTransport) copyResponse(dst *http.Response, src *fasthttp.Respo // Cast to a string to make a copy seeing as src.Body() won't // be valid after the response is released back to the pool (fasthttp.ReleaseResponse). - dst.Body = ioutil.NopCloser(strings.NewReader(string(src.Body()))) + dst.Body = io.NopCloser(strings.NewReader(string(src.Body()))) return dst } From fa8792a65e13b22bcfc3d0d8adf61101ebb59a8a Mon Sep 17 00:00:00 2001 From: cortze Date: Wed, 17 May 2023 14:58:53 +0200 Subject: [PATCH 3/6] make CI happpy with format --- node/modules/tracer/elasticsearch_transport.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/node/modules/tracer/elasticsearch_transport.go b/node/modules/tracer/elasticsearch_transport.go index 3cc50b388..05f9d748b 100644 --- a/node/modules/tracer/elasticsearch_transport.go +++ b/node/modules/tracer/elasticsearch_transport.go @@ -1,10 +1,10 @@ package tracer import ( + "bytes" "context" "encoding/json" "fmt" - "bytes" "net/url" "time" @@ -32,8 +32,8 @@ func NewElasticSearchTransport(connectionString string, elasticsearchIndex strin Addresses: []string{ conUrl.Scheme + "://" + conUrl.Host, }, - Username: username, - Password: password, + Username: username, + Password: password, Transport: &FastHttpTransport{}, } @@ -65,15 +65,15 @@ func NewElasticSearchTransport(connectionString string, elasticsearchIndex strin } return &elasticSearchTransport{ - cl: es, - bi: bi, + cl: es, + bi: bi, esIndex: esIndex, }, nil } type elasticSearchTransport struct { cl *elasticsearch.Client - bi esutil.BulkIndexer + bi esutil.BulkIndexer esIndex string } From bb92984dcad19f1d9cd22a4eab3d7340ebc40f83 Mon Sep 17 00:00:00 2001 From: cortze Date: Mon, 22 May 2023 09:13:10 +0200 Subject: [PATCH 4/6] tracer: simplify the valid Iwant logic --- node/modules/lp2p/pubsub.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index fcee63192..d0381c5dc 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -609,10 +609,7 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) { } } // check if we have any of iwant msgs (it doesn't classify per topic - just msg.ID) - var validIwant bool = false - if len(iwant) > 0 { - validIwant = true - } + validIwant := len(iwant) > 0 // trace the event if any of the flags was triggered if validIhave || validIwant || validTopic { @@ -648,10 +645,8 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) { } } // check if there was any of the Iwant msgs - var validIwant bool = false - if len(iwant) > 0 { - validIwant = true - } + validIwant := len(iwant) > 0 + // trace the msgs if any of the flags was triggered if validIhave || validIwant || validTopic { if trw.lp2pTracer != nil { From 42d2f4d7e48104c4b8c6f19720e4eef369976442 Mon Sep 17 00:00:00 2001 From: cortze Date: Mon, 22 May 2023 09:43:07 +0200 Subject: [PATCH 5/6] tracer: move es transport to net/http + format --- node/modules/lp2p/pubsub.go | 2 +- .../modules/tracer/elasticsearch_transport.go | 3 +- node/modules/tracer/fasthttp_transport.go | 74 ------------------- 3 files changed, 3 insertions(+), 76 deletions(-) delete mode 100644 node/modules/tracer/fasthttp_transport.go diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index d0381c5dc..33a03f844 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -645,7 +645,7 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) { } } // check if there was any of the Iwant msgs - validIwant := len(iwant) > 0 + validIwant := len(iwant) > 0 // trace the msgs if any of the flags was triggered if validIhave || validIwant || validTopic { diff --git a/node/modules/tracer/elasticsearch_transport.go b/node/modules/tracer/elasticsearch_transport.go index 05f9d748b..e54e0eba2 100644 --- a/node/modules/tracer/elasticsearch_transport.go +++ b/node/modules/tracer/elasticsearch_transport.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "net/url" "time" @@ -34,7 +35,7 @@ func NewElasticSearchTransport(connectionString string, elasticsearchIndex strin }, Username: username, Password: password, - Transport: &FastHttpTransport{}, + Transport: &http.Transport{}, } es, err := elasticsearch.NewClient(cfg) diff --git a/node/modules/tracer/fasthttp_transport.go b/node/modules/tracer/fasthttp_transport.go deleted file mode 100644 index e7e3b8331..000000000 --- a/node/modules/tracer/fasthttp_transport.go +++ /dev/null @@ -1,74 +0,0 @@ -package tracer - -import ( - "io" - "net/http" - "strings" - - "github.com/valyala/fasthttp" -) - -// Transport implements the elastictransport interface with -// the github.com/valyala/fasthttp HTTP client. -type FastHttpTransport struct{} - -// RoundTrip performs the request and returns a response or error -func (t *FastHttpTransport) RoundTrip(req *http.Request) (*http.Response, error) { - freq := fasthttp.AcquireRequest() - defer fasthttp.ReleaseRequest(freq) - - fres := fasthttp.AcquireResponse() - defer fasthttp.ReleaseResponse(fres) - - t.copyRequest(freq, req) - - err := fasthttp.Do(freq, fres) - if err != nil { - return nil, err - } - - res := &http.Response{Header: make(http.Header)} - t.copyResponse(res, fres) - - return res, nil -} - -// copyRequest converts a http.Request to fasthttp.Request -func (t *FastHttpTransport) copyRequest(dst *fasthttp.Request, src *http.Request) *fasthttp.Request { - if src.Method == "GET" && src.Body != nil { - src.Method = "POST" - } - - dst.SetHost(src.Host) - dst.SetRequestURI(src.URL.String()) - - dst.Header.SetRequestURI(src.URL.String()) - dst.Header.SetMethod(src.Method) - - for k, vv := range src.Header { - for _, v := range vv { - dst.Header.Set(k, v) - } - } - - if src.Body != nil { - dst.SetBodyStream(src.Body, -1) - } - - return dst -} - -// copyResponse converts a http.Response to fasthttp.Response -func (t *FastHttpTransport) copyResponse(dst *http.Response, src *fasthttp.Response) *http.Response { - dst.StatusCode = src.StatusCode() - - src.Header.VisitAll(func(k, v []byte) { - dst.Header.Set(string(k), string(v)) - }) - - // Cast to a string to make a copy seeing as src.Body() won't - // be valid after the response is released back to the pool (fasthttp.ReleaseResponse). - dst.Body = io.NopCloser(strings.NewReader(string(src.Body()))) - - return dst -} From d9a65223abab80f32c474841f8daa75db7ce0a36 Mon Sep 17 00:00:00 2001 From: cortze Date: Mon, 22 May 2023 11:59:39 +0200 Subject: [PATCH 6/6] tidy dependencies --- go.mod | 2 -- go.sum | 4 ---- 2 files changed, 6 deletions(-) diff --git a/go.mod b/go.mod index 1b091e095..f97f57fc3 100644 --- a/go.mod +++ b/go.mod @@ -144,7 +144,6 @@ require ( github.com/stretchr/testify v1.8.2 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/urfave/cli/v2 v2.16.3 - github.com/valyala/fasthttp v1.45.0 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa github.com/whyrusleeping/ledger-filecoin-go v0.9.1-0.20201010031517-c3dcc1bddce4 @@ -180,7 +179,6 @@ require ( github.com/StackExchange/wmi v1.2.1 // indirect github.com/akavel/rsrc v0.8.0 // indirect github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect - github.com/andybalholm/brotli v1.0.5 // indirect github.com/armon/go-metrics v0.3.9 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index 4b4dd2a9f..9665d04be 100644 --- a/go.sum +++ b/go.sum @@ -93,8 +93,6 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= -github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= -github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -1636,8 +1634,6 @@ github.com/urfave/cli/v2 v2.16.3 h1:gHoFIwpPjoyIMbJp/VFd+/vuD0dAgFK4B6DpEMFJfQk= github.com/urfave/cli/v2 v2.16.3/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.45.0 h1:zPkkzpIn8tdHZUrVa6PzYd0i5verqiPSkgTd3bSUcpA= -github.com/valyala/fasthttp v1.45.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=