From c2e2725855b4c6f47c471700025983df476ea608 Mon Sep 17 00:00:00 2001 From: cortze Date: Mon, 24 Apr 2023 17:34:56 +0200 Subject: [PATCH] update traces, es transport with batches and fasthttp --- go.mod | 2 + go.sum | 4 + node/modules/lp2p/pubsub.go | 204 ++++++++++++++---- .../modules/tracer/elasticsearch_transport.go | 65 +++--- node/modules/tracer/fasthttp_transport.go | 74 +++++++ 5 files changed, 276 insertions(+), 73 deletions(-) create mode 100644 node/modules/tracer/fasthttp_transport.go diff --git a/go.mod b/go.mod index f97f57fc3..1b091e095 100644 --- a/go.mod +++ b/go.mod @@ -144,6 +144,7 @@ require ( github.com/stretchr/testify v1.8.2 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/urfave/cli/v2 v2.16.3 + github.com/valyala/fasthttp v1.45.0 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa github.com/whyrusleeping/ledger-filecoin-go v0.9.1-0.20201010031517-c3dcc1bddce4 @@ -179,6 +180,7 @@ require ( github.com/StackExchange/wmi v1.2.1 // indirect github.com/akavel/rsrc v0.8.0 // indirect github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect + github.com/andybalholm/brotli v1.0.5 // indirect github.com/armon/go-metrics v0.3.9 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index 9665d04be..4b4dd2a9f 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= +github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= +github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -1634,6 +1636,8 @@ github.com/urfave/cli/v2 v2.16.3 h1:gHoFIwpPjoyIMbJp/VFd+/vuD0dAgFK4B6DpEMFJfQk= github.com/urfave/cli/v2 v2.16.3/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.45.0 h1:zPkkzpIn8tdHZUrVa6PzYd0i5verqiPSkgTd3bSUcpA= +github.com/valyala/fasthttp v1.45.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 5376d93c8..038c129ac 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -381,9 +381,9 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { allowTopics = append(allowTopics, drandTopics...) options = append(options, pubsub.WithSubscriptionFilter( - pubsub.WrapLimitSubscriptionFilter( - pubsub.NewAllowlistSubscriptionFilter(allowTopics...), - 100))) + pubsub.WrapLimitSubscriptionFilter( + pubsub.NewAllowlistSubscriptionFilter(allowTopics...), + 100))) var transports []tracer.TracerTransport if in.Cfg.JsonTracer != "" { @@ -394,6 +394,20 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { transports = append(transports, jsonTransport) } + + tps := make([]string, 0) // range of topics that will be submited to the traces + addTopicToList := func(topicList []string, newTopic string) []string { + // check if the topic is already in the list + for _, tp := range topicList { + if tp == newTopic { + return topicList + } + } + // add it otherwise + return append(topicList, newTopic) + } + + // tracer if in.Cfg.ElasticSearchTracer != "" { elasticSearchTransport, err := tracer.NewElasticSearchTransport( in.Cfg.ElasticSearchTracer, @@ -403,7 +417,9 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { return nil, err } transports = append(transports, elasticSearchTransport) + tps = addTopicToList(tps, build.BlocksTopic(in.Nn)) } + lt := tracer.NewLotusTracer(transports, in.Host.ID(), in.Cfg.TracerSourceAuth) // tracer @@ -412,28 +428,25 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { if err != nil { return nil, err } - pi, err := peer.AddrInfoFromP2pAddr(a) if err != nil { return nil, err } - tr, err := pubsub.NewRemoteTracer(context.TODO(), in.Host, *pi) if err != nil { return nil, err } - pst := newPeerScoreTracker(lt, in.Sk) - trw := newTracerWrapper(tr, lt, build.BlocksTopic(in.Nn)) + trw := newTracerWrapper(tr, lt, tps...) options = append(options, pubsub.WithEventTracer(trw)) options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second)) } else { // still instantiate a tracer for collecting metrics - trw := newTracerWrapper(nil, lt) - options = append(options, pubsub.WithEventTracer(trw)) - pst := newPeerScoreTracker(lt, in.Sk) + trw := newTracerWrapper(nil, lt, tps...) + + options = append(options, pubsub.WithEventTracer(trw)) options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second)) } @@ -457,7 +470,6 @@ func newTracerWrapper( topicsMap[topic] = struct{}{} } } - return &tracerWrapper{lp2pTracer: lp2pTracer, lotusTracer: lotusTracer, topics: topicsMap} } @@ -486,71 +498,169 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) { if trw.lp2pTracer != nil { trw.lp2pTracer.Trace(evt) } - if trw.lotusTracer != nil { trw.lotusTracer.Trace(evt) } } + case pubsub_pb.TraceEvent_DELIVER_MESSAGE: stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1)) - if trw.traceMessage(evt.GetDeliverMessage().GetTopic()) { - if trw.lp2pTracer != nil { - trw.lp2pTracer.Trace(evt) - } - - if trw.lotusTracer != nil { - trw.lotusTracer.Trace(evt) - } - } + case pubsub_pb.TraceEvent_REJECT_MESSAGE: stats.Record(context.TODO(), metrics.PubsubRejectMessage.M(1)) if trw.traceMessage(evt.GetRejectMessage().GetTopic()) { if trw.lp2pTracer != nil { trw.lp2pTracer.Trace(evt) } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } + + case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE: + stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1)) + if trw.traceMessage(evt.GetDuplicateMessage().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } + + case pubsub_pb.TraceEvent_JOIN: + if trw.traceMessage(evt.GetJoin().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } + case pubsub_pb.TraceEvent_LEAVE: + if trw.traceMessage(evt.GetLeave().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } + + case pubsub_pb.TraceEvent_GRAFT: + if trw.traceMessage(evt.GetGraft().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } if trw.lotusTracer != nil { trw.lotusTracer.Trace(evt) } } - case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE: - stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1)) - case pubsub_pb.TraceEvent_JOIN: - if trw.lp2pTracer != nil { - trw.lp2pTracer.Trace(evt) - } - if trw.lotusTracer != nil { - trw.lotusTracer.Trace(evt) - } - case pubsub_pb.TraceEvent_LEAVE: - if trw.lp2pTracer != nil { - trw.lp2pTracer.Trace(evt) - } - - if trw.lotusTracer != nil { - trw.lotusTracer.Trace(evt) - } - case pubsub_pb.TraceEvent_GRAFT: - if trw.lp2pTracer != nil { - trw.lp2pTracer.Trace(evt) - } - - if trw.lotusTracer != nil { - trw.lotusTracer.Trace(evt) - } case pubsub_pb.TraceEvent_PRUNE: + if trw.traceMessage(evt.GetPrune().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } + + case pubsub_pb.TraceEvent_ADD_PEER: if trw.lp2pTracer != nil { trw.lp2pTracer.Trace(evt) } - if trw.lotusTracer != nil { trw.lotusTracer.Trace(evt) } + + case pubsub_pb.TraceEvent_REMOVE_PEER: + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + case pubsub_pb.TraceEvent_RECV_RPC: stats.Record(context.TODO(), metrics.PubsubRecvRPC.M(1)) + // only track the RPC Calls from IWANT / IHAVE / BLOCK topic + controlRPC := evt.GetRecvRPC().GetMeta().GetControl() + ihave := controlRPC.GetIhave() + iwant := controlRPC.GetIwant() + msgsRPC := evt.GetRecvRPC().GetMeta().GetMessages() + + // check if any of the messages we are sending belong to a trackable topic + var validTopic bool = false + for _, topic := range msgsRPC { + if trw.traceMessage(topic.GetTopic()) { + validTopic = true + break + } + } + // track if the Iwant / Ihave messages are from a valid Topic + var validIhave bool = false + for _, msgs := range ihave { + if trw.traceMessage(msgs.GetTopic()) { + validIhave = true + break + } + } + // check if we have any of iwant msgs (it doesn't classify per topic - just msg.ID) + var validIwant bool = false + if len(iwant) > 0 { + validIwant = true + } + + // trace the event if any of the flags was triggered + if validIhave || validIwant || validTopic { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } case pubsub_pb.TraceEvent_SEND_RPC: stats.Record(context.TODO(), metrics.PubsubSendRPC.M(1)) + // only track the RPC Calls from IWANT / IHAVE / BLOCK topic + controlRPC := evt.GetSendRPC().GetMeta().GetControl() + ihave := controlRPC.GetIhave() + iwant := controlRPC.GetIwant() + msgsRPC := evt.GetSendRPC().GetMeta().GetMessages() + + // check if any of the messages we are sending belong to a trackable topic + var validTopic bool = false + for _, topic := range msgsRPC { + if trw.traceMessage(topic.GetTopic()) { + validTopic = true + break + } + } + // track if the Iwant / Ihave messages are from a valid Topic + var validIhave bool = false + for _, msgs := range ihave { + if trw.traceMessage(msgs.GetTopic()) { + validIhave = true + break + } + } + // check if there was any of the Iwant msgs + var validIwant bool = false + if len(iwant) > 0 { + validIwant = true + } + // trace the msgs if any of the flags was triggered + if validIhave || validIwant || validTopic { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } case pubsub_pb.TraceEvent_DROP_RPC: stats.Record(context.TODO(), metrics.PubsubDropRPC.M(1)) } diff --git a/node/modules/tracer/elasticsearch_transport.go b/node/modules/tracer/elasticsearch_transport.go index 1f6f9a157..3cc50b388 100644 --- a/node/modules/tracer/elasticsearch_transport.go +++ b/node/modules/tracer/elasticsearch_transport.go @@ -4,15 +4,19 @@ import ( "context" "encoding/json" "fmt" + "bytes" "net/url" - "strings" + "time" "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/elastic/go-elasticsearch/v7/esutil" ) const ( ElasticSearchDefaultIndex = "lotus-pubsub" + flushInterval = 10 * time.Second + flushBytes = 1024 * 1024 // MB + esWorkers = 2 // TODO: hardcoded ) func NewElasticSearchTransport(connectionString string, elasticsearchIndex string) (TracerTransport, error) { @@ -30,10 +34,10 @@ func NewElasticSearchTransport(connectionString string, elasticsearchIndex strin }, Username: username, Password: password, + Transport: &FastHttpTransport{}, } es, err := elasticsearch.NewClient(cfg) - if err != nil { return nil, err } @@ -45,14 +49,31 @@ func NewElasticSearchTransport(connectionString string, elasticsearchIndex strin esIndex = ElasticSearchDefaultIndex } + // Create the BulkIndexer to batch ES trace submission + bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Index: esIndex, + Client: es, + NumWorkers: esWorkers, + FlushBytes: int(flushBytes), + FlushInterval: flushInterval, + OnError: func(ctx context.Context, err error) { + log.Errorf("Error persisting queries %s", err.Error()) + }, + }) + if err != nil { + return nil, err + } + return &elasticSearchTransport{ - cl: es, + cl: es, + bi: bi, esIndex: esIndex, }, nil } type elasticSearchTransport struct { cl *elasticsearch.Client + bi esutil.BulkIndexer esIndex string } @@ -72,26 +93,18 @@ func (est *elasticSearchTransport) Transport(evt TracerTransportEvent) error { return fmt.Errorf("error while marshaling event: %s", err) } - req := esapi.IndexRequest{ - Index: est.esIndex, - Body: strings.NewReader(string(jsonEvt)), - Refresh: "true", - } - - // Perform the request with the client. - res, err := req.Do(context.Background(), est.cl) - if err != nil { - return err - } - - err = res.Body.Close() - if err != nil { - return err - } - - if res.IsError() { - return fmt.Errorf("[%s] Error indexing document ID=%s", res.Status(), req.DocumentID) - } - - return nil + return est.bi.Add( + context.Background(), + esutil.BulkIndexerItem{ + Action: "index", + Body: bytes.NewReader(jsonEvt), + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + if err != nil { + log.Errorf("unable to submit trace - %s", err) + } else { + log.Errorf("unable to submit trace %s: %s", res.Error.Type, res.Error.Reason) + } + }, + }, + ) } diff --git a/node/modules/tracer/fasthttp_transport.go b/node/modules/tracer/fasthttp_transport.go new file mode 100644 index 000000000..b17bd8132 --- /dev/null +++ b/node/modules/tracer/fasthttp_transport.go @@ -0,0 +1,74 @@ +package tracer + +import ( + "io/ioutil" + "net/http" + "strings" + + "github.com/valyala/fasthttp" +) + +// Transport implements the elastictransport interface with +// the github.com/valyala/fasthttp HTTP client. +type FastHttpTransport struct{} + +// RoundTrip performs the request and returns a response or error +func (t *FastHttpTransport) RoundTrip(req *http.Request) (*http.Response, error) { + freq := fasthttp.AcquireRequest() + defer fasthttp.ReleaseRequest(freq) + + fres := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(fres) + + t.copyRequest(freq, req) + + err := fasthttp.Do(freq, fres) + if err != nil { + return nil, err + } + + res := &http.Response{Header: make(http.Header)} + t.copyResponse(res, fres) + + return res, nil +} + +// copyRequest converts a http.Request to fasthttp.Request +func (t *FastHttpTransport) copyRequest(dst *fasthttp.Request, src *http.Request) *fasthttp.Request { + if src.Method == "GET" && src.Body != nil { + src.Method = "POST" + } + + dst.SetHost(src.Host) + dst.SetRequestURI(src.URL.String()) + + dst.Header.SetRequestURI(src.URL.String()) + dst.Header.SetMethod(src.Method) + + for k, vv := range src.Header { + for _, v := range vv { + dst.Header.Set(k, v) + } + } + + if src.Body != nil { + dst.SetBodyStream(src.Body, -1) + } + + return dst +} + +// copyResponse converts a http.Response to fasthttp.Response +func (t *FastHttpTransport) copyResponse(dst *http.Response, src *fasthttp.Response) *http.Response { + dst.StatusCode = src.StatusCode() + + src.Header.VisitAll(func(k, v []byte) { + dst.Header.Set(string(k), string(v)) + }) + + // Cast to a string to make a copy seeing as src.Body() won't + // be valid after the response is released back to the pool (fasthttp.ReleaseResponse). + dst.Body = ioutil.NopCloser(strings.NewReader(string(src.Body()))) + + return dst +}