Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exporter/elasticexporter: add Elastic APM exporter #240

Merged
merged 1 commit into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuremonitorexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/carbonexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/honeycombexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerthrifthttpexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kinesisexporter"
Expand Down Expand Up @@ -101,6 +102,7 @@ func components() (config.Factories, error) {
&jaegerthrifthttpexporter.Factory{},
&lightstepexporter.Factory{},
&splunkhecexporter.Factory{},
&elasticexporter.Factory{},
}
for _, exp := range factories.Exporters {
exporters = append(exporters, exp)
Expand Down
1 change: 1 addition & 0 deletions exporter/elasticexporter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
22 changes: 22 additions & 0 deletions exporter/elasticexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Elastic APM Exporter

This exporter supports sending OpenTelemetry data to [Elastic APM](https://www.elastic.co/apm)

Configuration options:

- `apm_server_url` (required): Elastic APM Server URL.
- `api_key` (optional): credential for API Key authorization, if enabled in Elastic APM Server.
- `secret_token` (optional): credential for Secret Token authorization, if enabled in Elastic APM Server.
- `ca_file` (optional): root Certificate Authority (CA) certificate, for verifying the server's identity, if TLS is enabled.
- `cert_file` (optional): client TLS certificate.
- `key_file` (optional): client TLS key.
- `insecure` (optional): disable verification of the server's identity, if TLS is enabled.

Example:

```yaml
exporters:
elastic:
apm_server_url: "https://elasticapm.example.com"
secret_token: "hunter2"
```
51 changes: 51 additions & 0 deletions exporter/elasticexporter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticexporter

import (
"errors"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtls"
)

// Config defines configuration for Elastic APM exporter.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"`
configtls.TLSClientSetting `mapstructure:",squash"`

// APMServerURLs holds the APM Server URL.
//
// This is required.
APMServerURL string `mapstructure:"apm_server_url"`

// APIKey holds an optional API Key for authorization.
//
// https://www.elastic.co/guide/en/apm/server/7.7/api-key-settings.html
APIKey string `mapstructure:"api_key"`

// SecretToken holds the optional secret token for authorization.
//
// https://www.elastic.co/guide/en/apm/server/7.7/secret-token.html
SecretToken string `mapstructure:"secret_token"`
}

// Validate validates the configuration.
func (cfg Config) Validate() error {
if cfg.APMServerURL == "" {
return errors.New("APMServerURL must be specified")
}
return nil
}
109 changes: 109 additions & 0 deletions exporter/elasticexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticexporter

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

func TestLoadConfig(t *testing.T) {
factories, err := config.ExampleComponents()
require.NoError(t, err)

factory := &Factory{}
factories.Exporters[configmodels.Type(typeStr)] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), factories,
)
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Exporters), 2)

r0 := cfg.Exporters["elastic"]
assert.Equal(t, r0, factory.CreateDefaultConfig())

r1 := cfg.Exporters["elastic/customname"].(*Config)
assert.Equal(t, r1, &Config{
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "elastic/customname"},
APMServerURL: "https://elastic.example.com",
APIKey: "RTNxMjlXNEJt",
SecretToken: "hunter2",
})
}

func TestConfigValidate(t *testing.T) {
var factory Factory
cfg := factory.CreateDefaultConfig().(*Config)
params := component.ExporterCreateParams{Logger: zap.NewNop()}

_, err := factory.CreateTraceExporter(context.Background(), params, cfg)
require.Error(t, err)
assert.EqualError(t, err, "cannot configure Elastic APM trace exporter: invalid config: APMServerURL must be specified")

cfg.APMServerURL = "foo"
_, err = factory.CreateTraceExporter(context.Background(), params, cfg)
assert.NoError(t, err)
}

func TestConfigAuth(t *testing.T) {
testAuth(t, "", "hunter2", "Bearer hunter2")
testAuth(t, "hunter2", "", "ApiKey hunter2")
}

func testAuth(t *testing.T, apiKey, secretToken, expectedAuthorization string) {
var factory Factory
params := component.ExporterCreateParams{Logger: zap.NewNop()}
cfg := factory.CreateDefaultConfig().(*Config)
cfg.APIKey = apiKey
cfg.SecretToken = secretToken

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
auth := r.Header.Get("Authorization")
if auth != expectedAuthorization {
w.WriteHeader(http.StatusUnauthorized)
fmt.Fprintf(w, "Expected Authorization=%s, got %s\n", expectedAuthorization, auth)
}
}))
defer srv.Close()
cfg.APMServerURL = srv.URL

te, err := factory.CreateTraceExporter(context.Background(), params, cfg)
assert.NoError(t, err)
assert.NotNil(t, te, "failed to create trace exporter")

traces := pdata.NewTraces()
resourceSpans := traces.ResourceSpans()
resourceSpans.Resize(1)
resourceSpans.At(0).InitEmpty()
resourceSpans.At(0).InstrumentationLibrarySpans().Resize(1)
resourceSpans.At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
span := resourceSpans.At(0).InstrumentationLibrarySpans().At(0).Spans().At(0)
span.SetName("foobar")
assert.NoError(t, te.ConsumeTraces(context.Background(), traces))
}
155 changes: 155 additions & 0 deletions exporter/elasticexporter/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package elasticexporter contains an opentelemetry-collector exporter
// for Elastic APM.
package elasticexporter

import (
"bytes"
"compress/zlib"
"context"
"fmt"
"net/http"
"net/url"

"go.elastic.co/apm/transport"
"go.elastic.co/fastjson"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticexporter/internal/translator/elastic"
)

func newElasticTraceExporter(
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.TraceExporter, error) {
exporter, err := newElasticExporter(cfg.(*Config), params.Logger)
if err != nil {
return nil, fmt.Errorf("cannot configure Elastic APM trace exporter: %v", err)
}
return exporterhelper.NewTraceExporter(cfg, func(ctx context.Context, traces pdata.Traces) (int, error) {
var dropped int
var errs []error
resourceSpansSlice := traces.ResourceSpans()
for i := 0; i < resourceSpansSlice.Len(); i++ {
resourceSpans := resourceSpansSlice.At(i)
n, err := exporter.ExportResourceSpans(ctx, resourceSpans)
if err != nil {
errs = append(errs, err)
}
dropped += n
}
return dropped, componenterror.CombineErrors(errs)
})
}

type elasticExporter struct {
transport transport.Transport
logger *zap.Logger
}

func newElasticExporter(config *Config, logger *zap.Logger) (*elasticExporter, error) {
if err := config.Validate(); err != nil {
return nil, fmt.Errorf("invalid config: %s", err)
}
transport, err := newTransport(config)
if err != nil {
return nil, err
}
return &elasticExporter{transport: transport, logger: logger}, nil
}

func newTransport(config *Config) (transport.Transport, error) {
transport, err := transport.NewHTTPTransport()
if err != nil {
return nil, fmt.Errorf("error creating HTTP transport: %v", err)
}
tlsConfig, err := config.LoadTLSConfig()
if err != nil {
return nil, err
}
httpTransport := transport.Client.Transport.(*http.Transport)
httpTransport.TLSClientConfig = tlsConfig

url, err := url.Parse(config.APMServerURL)
if err != nil {
return nil, err
}
transport.SetServerURL(url)

if config.APIKey != "" {
transport.SetAPIKey(config.APIKey)
} else if config.SecretToken != "" {
transport.SetSecretToken(config.SecretToken)
}

transport.SetUserAgent("opentelemetry-collector")
return transport, nil
}

// ExportResourceSpans exports OTLP trace data to Elastic APM Server,
// returning the number of spans that were dropped along with any errors.
func (e *elasticExporter) ExportResourceSpans(ctx context.Context, rs pdata.ResourceSpans) (int, error) {
var w fastjson.Writer
elastic.EncodeResourceMetadata(rs.Resource(), &w)
var errs []error
var count int
instrumentationLibrarySpansSlice := rs.InstrumentationLibrarySpans()
for i := 0; i < instrumentationLibrarySpansSlice.Len(); i++ {
instrumentationLibrarySpans := instrumentationLibrarySpansSlice.At(i)
instrumentationLibrary := instrumentationLibrarySpans.InstrumentationLibrary()
spanSlice := instrumentationLibrarySpans.Spans()
for i := 0; i < spanSlice.Len(); i++ {
count++
span := spanSlice.At(i)
before := w.Size()
if err := elastic.EncodeSpan(span, instrumentationLibrary, &w); err != nil {
w.Rewind(before)
errs = append(errs, err)
}
}
}
if err := e.sendEvents(ctx, &w); err != nil {
return count, err
}
return len(errs), componenterror.CombineErrors(errs)
}

func (e *elasticExporter) sendEvents(ctx context.Context, w *fastjson.Writer) error {
e.logger.Debug("sending events", zap.ByteString("events", w.Bytes()))

var buf bytes.Buffer
zw, err := zlib.NewWriterLevel(&buf, zlib.DefaultCompression)
if err != nil {
return err
}
if _, err := zw.Write(w.Bytes()); err != nil {
return err
}
if err := zw.Close(); err != nil {
return err
}
if err := e.transport.SendStream(ctx, &buf); err != nil {
// TODO(axw) check response for number of accepted items,
// and take that into account in the result.
return err
}
return nil
}
Loading