2021-09-15 12:50:27 +00:00
|
|
|
package tracer
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"github.com/elastic/go-elasticsearch/v7"
|
|
|
|
"github.com/elastic/go-elasticsearch/v7/esapi"
|
|
|
|
)
|
|
|
|
|
2021-09-15 13:11:34 +00:00
|
|
|
const (
|
|
|
|
ElasticSearch_INDEX = "pubsub"
|
2021-09-15 14:03:51 +00:00
|
|
|
|
|
|
|
ElasticSearch_DOC_LOTUS = "doc_lotus"
|
|
|
|
ElasticSearch_DOC_PUBSUB = "doc_pubsub"
|
2021-09-15 13:11:34 +00:00
|
|
|
)
|
2021-09-15 12:50:27 +00:00
|
|
|
|
2021-09-15 12:58:36 +00:00
|
|
|
func NewElasticSearchTransport() (TracerTransport, error) {
|
2021-09-15 12:50:27 +00:00
|
|
|
es, err := elasticsearch.NewDefaultClient()
|
|
|
|
|
|
|
|
if err != nil {
|
2021-09-15 12:58:36 +00:00
|
|
|
return nil, err
|
2021-09-15 12:50:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return &elasticSearchTransport{
|
|
|
|
cl: es,
|
2021-09-15 12:58:36 +00:00
|
|
|
}, nil
|
2021-09-15 12:50:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type elasticSearchTransport struct {
|
|
|
|
cl *elasticsearch.Client
|
|
|
|
}
|
|
|
|
|
2021-09-15 14:03:51 +00:00
|
|
|
func (est *elasticSearchTransport) Transport(event TracerTransportEvent) error {
|
|
|
|
var e interface{}
|
|
|
|
var docId string
|
|
|
|
if event.lotusTraceEvent != nil {
|
|
|
|
e = *event.lotusTraceEvent
|
|
|
|
docId = ElasticSearch_DOC_LOTUS
|
|
|
|
} else if event.pubsubTraceEvent != nil {
|
|
|
|
e = *event.pubsubTraceEvent
|
|
|
|
docId = ElasticSearch_DOC_PUBSUB
|
|
|
|
} else {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
jsonEvent, err := json.Marshal(e)
|
|
|
|
if err != nil {
|
2021-09-15 14:05:44 +00:00
|
|
|
return fmt.Errorf("error while marshaling event: %s", err)
|
2021-09-15 14:03:51 +00:00
|
|
|
}
|
|
|
|
|
2021-09-15 12:50:27 +00:00
|
|
|
req := esapi.IndexRequest{
|
2021-09-15 13:11:34 +00:00
|
|
|
Index: ElasticSearch_INDEX,
|
2021-09-15 14:03:51 +00:00
|
|
|
DocumentID: docId,
|
2021-09-15 12:50:27 +00:00
|
|
|
Body: strings.NewReader(string(jsonEvent)),
|
|
|
|
Refresh: "true",
|
|
|
|
}
|
|
|
|
|
|
|
|
// Perform the request with the client.
|
|
|
|
res, err := req.Do(context.Background(), est.cl)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer res.Body.Close()
|
|
|
|
|
|
|
|
if res.IsError() {
|
|
|
|
return fmt.Errorf("[%s] Error indexing document ID=%s", res.Status(), req.DocumentID)
|
|
|
|
} else {
|
|
|
|
// Deserialize the response into a map.
|
|
|
|
var r map[string]interface{}
|
|
|
|
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|