package tracer import ( "bytes" "context" "encoding/json" "fmt" "net/http" "net/url" "time" "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esutil" ) const ( ElasticSearchDefaultIndex = "lotus-pubsub" flushInterval = 10 * time.Second flushBytes = 1024 * 1024 // MB esWorkers = 2 // TODO: hardcoded ) func NewElasticSearchTransport(connectionString string, elasticsearchIndex string) (TracerTransport, error) { conUrl, err := url.Parse(connectionString) if err != nil { return nil, err } username := conUrl.User.Username() password, _ := conUrl.User.Password() cfg := elasticsearch.Config{ Addresses: []string{ conUrl.Scheme + "://" + conUrl.Host, }, Username: username, Password: password, Transport: &http.Transport{}, } es, err := elasticsearch.NewClient(cfg) if err != nil { return nil, err } var esIndex string if elasticsearchIndex != "" { esIndex = elasticsearchIndex } else { esIndex = ElasticSearchDefaultIndex } // Create the BulkIndexer to batch ES trace submission bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Index: esIndex, Client: es, NumWorkers: esWorkers, FlushBytes: int(flushBytes), FlushInterval: flushInterval, OnError: func(ctx context.Context, err error) { log.Errorf("Error persisting queries %s", err.Error()) }, }) if err != nil { return nil, err } return &elasticSearchTransport{ cl: es, bi: bi, esIndex: esIndex, }, nil } type elasticSearchTransport struct { cl *elasticsearch.Client bi esutil.BulkIndexer esIndex string } func (est *elasticSearchTransport) Transport(evt TracerTransportEvent) error { var e interface{} if evt.lotusTraceEvent != nil { e = *evt.lotusTraceEvent } else if evt.pubsubTraceEvent != nil { e = *evt.pubsubTraceEvent } else { return nil } jsonEvt, err := json.Marshal(e) if err != nil { return fmt.Errorf("error while marshaling event: %s", err) } return est.bi.Add( context.Background(), esutil.BulkIndexerItem{ Action: "index", Body: bytes.NewReader(jsonEvt), OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { if err != nil { log.Errorf("unable to submit trace - %s", err) } else { log.Errorf("unable to submit trace %s: %s", res.Error.Type, res.Error.Reason) } }, }, ) }