-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Reimplement OTEL elasticsearch exporter Signed-off-by: Pavol Loffay <[email protected]> * Fix typo in comment Signed-off-by: Pavol Loffay <[email protected]>
- Loading branch information
1 parent
7c9e1d3
commit 00b6e96
Showing
26 changed files
with
1,980 additions
and
98 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
84 changes: 84 additions & 0 deletions
84
cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
// Copyright (c) 2020 The Jaeger Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package esclient | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"io" | ||
|
||
"go.uber.org/zap" | ||
|
||
"github.com/jaegertracing/jaeger/pkg/es/config" | ||
) | ||
|
||
// ElasticsearchClient exposes Elasticsearch API used by Jaeger | ||
type ElasticsearchClient interface { | ||
// PutTemplate creates index template | ||
PutTemplate(name string, template io.Reader) error | ||
// Bulk submits a bulk request | ||
Bulk(bulkBody io.Reader) (*BulkResponse, error) | ||
// AddDataToBulkBuffer creates bulk item from data, index and typ and adds it to bulkBody | ||
AddDataToBulkBuffer(bulkBody *bytes.Buffer, data []byte, index, typ string) | ||
} | ||
|
||
// BulkResponse is a response returned by Elasticsearch Bulk API | ||
type BulkResponse struct { | ||
Errors bool `json:"errors"` | ||
Items []struct { | ||
Index struct { | ||
ID string `json:"_id"` | ||
Result string `json:"result"` | ||
Status int `json:"status"` | ||
Error struct { | ||
Type string `json:"type"` | ||
Reason string `json:"reason"` | ||
Cause struct { | ||
Type string `json:"type"` | ||
Reason string `json:"reason"` | ||
} `json:"caused_by"` | ||
} `json:"error"` | ||
} `json:"index"` | ||
} `json:"items"` | ||
} | ||
|
||
// NewElasticsearchClient returns an instance of Elasticsearch client | ||
func NewElasticsearchClient(params config.Configuration, logger *zap.Logger) (ElasticsearchClient, error) { | ||
roundTripper, err := config.GetHTTPRoundTripper(¶ms, logger) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if params.GetVersion() == 0 { | ||
esPing := elasticsearchPing{ | ||
username: params.Username, | ||
password: params.Password, | ||
roundTripper: roundTripper, | ||
} | ||
esVersion, err := esPing.getVersion(params.Servers[0]) | ||
if err != nil { | ||
return nil, err | ||
} | ||
logger.Info("Elasticsearch detected", zap.Int("version", esVersion)) | ||
params.Version = uint(esVersion) | ||
} | ||
switch params.Version { | ||
case 5, 6: | ||
return newElasticsearch6Client(params, roundTripper) | ||
case 7: | ||
return newElasticsearch7Client(params, roundTripper) | ||
default: | ||
return nil, fmt.Errorf("could not create Elasticseach client for version %d", params.Version) | ||
} | ||
} |
63 changes: 63 additions & 0 deletions
63
cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
// Copyright (c) 2020 The Jaeger Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package esclient | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/zap" | ||
|
||
"github.com/jaegertracing/jaeger/pkg/es/config" | ||
) | ||
|
||
func TestGetClient(t *testing.T) { | ||
tests := []struct { | ||
version int | ||
err string | ||
}{ | ||
{ | ||
version: 5, | ||
}, | ||
{ | ||
version: 6, | ||
}, | ||
{ | ||
version: 7, | ||
}, | ||
{ | ||
version: 1, | ||
err: "could not create Elasticseach client for version 1", | ||
}, | ||
} | ||
for _, test := range tests { | ||
t.Run(test.err, func(t *testing.T) { | ||
client, err := NewElasticsearchClient(config.Configuration{Servers: []string{""}, Version: uint(test.version)}, zap.NewNop()) | ||
if test.err == "" { | ||
require.NoError(t, err) | ||
} | ||
switch test.version { | ||
case 5, 6: | ||
assert.IsType(t, &elasticsearch6Client{}, client) | ||
case 7: | ||
assert.IsType(t, &elasticsearch7Client{}, client) | ||
default: | ||
assert.EqualError(t, err, test.err) | ||
assert.Nil(t, client) | ||
} | ||
}) | ||
} | ||
} |
87 changes: 87 additions & 0 deletions
87
cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
// Copyright (c) 2020 The Jaeger Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package esclient | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
|
||
elasticsearch6 "github.com/elastic/go-elasticsearch/v6" | ||
|
||
"github.com/jaegertracing/jaeger/pkg/es/config" | ||
) | ||
|
||
const ( | ||
bulkES6MetaFormat = `{"index":{"_index":"%s","_type":"%s"}}` + "\n" | ||
) | ||
|
||
type elasticsearch6Client struct { | ||
client *elasticsearch6.Client | ||
} | ||
|
||
var _ ElasticsearchClient = (*elasticsearch6Client)(nil) | ||
|
||
func newElasticsearch6Client(params config.Configuration, roundTripper http.RoundTripper) (*elasticsearch6Client, error) { | ||
client, err := elasticsearch6.NewClient(elasticsearch6.Config{ | ||
DiscoverNodesOnStart: params.Sniffer, | ||
Addresses: params.Servers, | ||
Username: params.Username, | ||
Password: params.Password, | ||
Transport: roundTripper, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &elasticsearch6Client{ | ||
client: client, | ||
}, nil | ||
} | ||
|
||
func (es *elasticsearch6Client) PutTemplate(name string, body io.Reader) error { | ||
resp, err := es.client.Indices.PutTemplate(name, body) | ||
if err != nil { | ||
return err | ||
} | ||
resp.Body.Close() | ||
return nil | ||
} | ||
|
||
func (es *elasticsearch6Client) AddDataToBulkBuffer(buffer *bytes.Buffer, data []byte, index, typ string) { | ||
meta := []byte(fmt.Sprintf(bulkES6MetaFormat, index, typ)) | ||
buffer.Grow(len(meta) + len(data) + len("\n")) | ||
buffer.Write(meta) | ||
buffer.Write(data) | ||
buffer.Write([]byte("\n")) | ||
} | ||
|
||
func (es *elasticsearch6Client) Bulk(reader io.Reader) (*BulkResponse, error) { | ||
response, err := es.client.Bulk(reader) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer response.Body.Close() | ||
if response.StatusCode >= 400 { | ||
return nil, fmt.Errorf("bulk request failed with code %d", response.StatusCode) | ||
} | ||
var blk BulkResponse | ||
err = json.NewDecoder(response.Body).Decode(&blk) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &blk, nil | ||
} |
119 changes: 119 additions & 0 deletions
119
cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
// Copyright (c) 2020 The Jaeger Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package esclient | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/jaegertracing/jaeger/pkg/es/config" | ||
) | ||
|
||
type mockTransport struct { | ||
Response *http.Response | ||
RoundTripFn func(req *http.Request) (*http.Response, error) | ||
} | ||
|
||
func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) { | ||
return t.RoundTripFn(req) | ||
} | ||
|
||
func TestES6NewClient_err(t *testing.T) { | ||
client, err := newElasticsearch6Client(config.Configuration{ | ||
Sniffer: true, | ||
Servers: []string{"$%"}, | ||
}, &http.Transport{}) | ||
require.Error(t, err) | ||
assert.Nil(t, client) | ||
} | ||
|
||
func TestES6PutTemplateES6Client(t *testing.T) { | ||
mocktrans := &mockTransport{ | ||
Response: &http.Response{ | ||
StatusCode: http.StatusOK, | ||
Body: ioutil.NopCloser(strings.NewReader(`{}`)), | ||
}, | ||
} | ||
mocktrans.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mocktrans.Response, nil } | ||
client, err := newElasticsearch6Client(config.Configuration{}, mocktrans) | ||
require.NoError(t, err) | ||
assert.NotNil(t, client) | ||
err = client.PutTemplate("foo", strings.NewReader("bar")) | ||
require.NoError(t, err) | ||
} | ||
|
||
func TestES6AddDataToBulk(t *testing.T) { | ||
client, err := newElasticsearch6Client(config.Configuration{}, &http.Transport{}) | ||
require.NoError(t, err) | ||
assert.NotNil(t, client) | ||
|
||
buf := &bytes.Buffer{} | ||
client.AddDataToBulkBuffer(buf, []byte("data"), "foo", "bar") | ||
assert.Equal(t, "{\"index\":{\"_index\":\"foo\",\"_type\":\"bar\"}}\ndata\n", buf.String()) | ||
} | ||
|
||
func TestES6Bulk(t *testing.T) { | ||
tests := []struct { | ||
resp *http.Response | ||
err string | ||
}{ | ||
{ | ||
resp: &http.Response{ | ||
StatusCode: http.StatusOK, | ||
Body: ioutil.NopCloser(strings.NewReader("{}")), | ||
}, | ||
}, | ||
{ | ||
resp: &http.Response{ | ||
StatusCode: http.StatusOK, | ||
Body: ioutil.NopCloser(strings.NewReader("{#}")), | ||
}, | ||
err: "looking for beginning of object key string", | ||
}, | ||
{ | ||
resp: &http.Response{ | ||
StatusCode: http.StatusBadRequest, | ||
Body: ioutil.NopCloser(strings.NewReader("{#}")), | ||
}, | ||
err: "bulk request failed with code 400", | ||
}, | ||
} | ||
for _, test := range tests { | ||
t.Run(test.err, func(t *testing.T) { | ||
mocktrans := &mockTransport{ | ||
Response: test.resp, | ||
} | ||
mocktrans.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mocktrans.Response, nil } | ||
|
||
client, err := newElasticsearch6Client(config.Configuration{}, mocktrans) | ||
require.NoError(t, err) | ||
assert.NotNil(t, client) | ||
_, err = client.Bulk(strings.NewReader("data")) | ||
if test.err != "" { | ||
fmt.Println() | ||
assert.Contains(t, err.Error(), test.err) | ||
} else { | ||
require.NoError(t, err) | ||
} | ||
}) | ||
} | ||
} |
Oops, something went wrong.