Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] alternative for splunkhec to use otel #33924

Merged
merged 5 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)

Expand Down Expand Up @@ -53,6 +55,7 @@ type client struct {
heartbeater *heartbeater
bufferPool bufferPool
exporterName string
meter metric.Meter
}

var jsonStreamPool = sync.Pool{
Expand All @@ -69,6 +72,7 @@ func newClient(set exporter.Settings, cfg *Config, maxContentLength uint) *clien
buildInfo: set.BuildInfo,
bufferPool: newBufferPool(maxContentLength, !cfg.DisableCompression),
exporterName: set.ID.String(),
meter: metadata.Meter(set.TelemetrySettings),
}
}

Expand Down Expand Up @@ -632,7 +636,7 @@ func (c *client) start(ctx context.Context, host component.Host) (err error) {
}
url, _ := c.config.getURL()
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(c.config, c.buildInfo), c.logger}
c.heartbeater = newHeartbeater(c.config, c.buildInfo, getPushLogFn(c))
c.heartbeater = newHeartbeater(c.config, c.buildInfo, getPushLogFn(c), c.meter)
if c.config.Heartbeat.Startup {
if err := c.heartbeater.sendHeartbeat(c.config, c.buildInfo, getPushLogFn(c)); err != nil {
return fmt.Errorf("%s: heartbeat on startup failed: %w", c.exporterName, err)
Expand Down
5 changes: 2 additions & 3 deletions exporter/splunkhecexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.104.0
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.31.0
go.opencensus.io v0.24.0
go.opentelemetry.io/collector/component v0.104.0
go.opentelemetry.io/collector/config/confighttp v0.104.0
go.opentelemetry.io/collector/config/configopaque v1.11.0
Expand All @@ -23,7 +22,9 @@ require (
go.opentelemetry.io/collector/exporter v0.104.0
go.opentelemetry.io/collector/pdata v1.11.0
go.opentelemetry.io/collector/semconv v0.104.0
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/metric v1.27.0
go.opentelemetry.io/otel/sdk/metric v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
Expand Down Expand Up @@ -99,10 +100,8 @@ require (
go.opentelemetry.io/collector/featuregate v1.11.0 // indirect
go.opentelemetry.io/collector/receiver v0.104.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect
go.opentelemetry.io/otel/sdk v1.27.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.27.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.26.0 // indirect
Expand Down
76 changes: 0 additions & 76 deletions exporter/splunkhecexporter/go.sum

Large diffs are not rendered by default.

66 changes: 25 additions & 41 deletions exporter/splunkhecexporter/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import (
"runtime"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)
Expand All @@ -36,52 +35,38 @@ func getMetricsName(overrides map[string]string, metricName string) string {
return metricName
}

func newHeartbeater(config *Config, buildInfo component.BuildInfo, pushLogFn func(ctx context.Context, ld plog.Logs) error) *heartbeater {
func newHeartbeater(config *Config, buildInfo component.BuildInfo, pushLogFn func(ctx context.Context, ld plog.Logs) error, meter metric.Meter) *heartbeater {
interval := config.Heartbeat.Interval
if interval == 0 {
return nil
}

var heartbeatsSent, heartbeatsFailed *stats.Int64Measure
var tagMutators []tag.Mutator
var heartbeatsSent, heartbeatsFailed metric.Int64Counter
var attrs attribute.Set
if config.Telemetry.Enabled {
overrides := config.Telemetry.OverrideMetricsNames
extraAttributes := config.Telemetry.ExtraAttributes
var tags []tag.Key
tagMutators = []tag.Mutator{}
var tags []attribute.KeyValue
for key, val := range extraAttributes {
newTag, _ := tag.NewKey(key)
tags = append(tags, newTag)
tagMutators = append(tagMutators, tag.Insert(newTag, val))
tags = append(tags, attribute.String(key, val))
}

heartbeatsSent = stats.Int64(
attrs = attribute.NewSet(tags...)
var err error
heartbeatsSent, err = meter.Int64Counter(
getMetricsName(overrides, defaultHBSentMetricsName),
"number of heartbeats sent",
stats.UnitDimensionless)

heartbeatsSentView := &view.View{
Name: heartbeatsSent.Name(),
Description: heartbeatsSent.Description(),
TagKeys: tags,
Measure: heartbeatsSent,
Aggregation: view.Sum(),
metric.WithDescription("number of heartbeats sent"),
metric.WithUnit("1"),
)
if err != nil {
return nil
}

heartbeatsFailed = stats.Int64(
heartbeatsFailed, err = meter.Int64Counter(
getMetricsName(overrides, defaultHBFailedMetricsName),
"number of heartbeats failed",
stats.UnitDimensionless)

heartbeatsFailedView := &view.View{
Name: heartbeatsFailed.Name(),
Description: heartbeatsFailed.Description(),
TagKeys: tags,
Measure: heartbeatsFailed,
Aggregation: view.Sum(),
}

if err := view.Register(heartbeatsSentView, heartbeatsFailedView); err != nil {
metric.WithDescription("number of heartbeats failed"),
metric.WithUnit("1"),
)
if err != nil {
return nil
}
}
Expand All @@ -99,7 +84,7 @@ func newHeartbeater(config *Config, buildInfo component.BuildInfo, pushLogFn fun
case <-ticker.C:
err := hbter.sendHeartbeat(config, buildInfo, pushLogFn)
if config.Telemetry.Enabled {
observe(heartbeatsSent, heartbeatsFailed, tagMutators, err)
observe(heartbeatsSent, heartbeatsFailed, attrs, err)
}
}
}
Expand All @@ -116,14 +101,13 @@ func (h *heartbeater) sendHeartbeat(config *Config, buildInfo component.BuildInf
}

// there is only use case for open census metrics recording for now. Extend to use open telemetry in the future.
func observe(heartbeatsSent *stats.Int64Measure, heartbeatsFailed *stats.Int64Measure, tagMutators []tag.Mutator, err error) {
var counter *stats.Int64Measure
func observe(heartbeatsSent, heartbeatsFailed metric.Int64Counter, attrs attribute.Set, err error) {
if err == nil {
counter = heartbeatsSent
heartbeatsSent.Add(context.Background(), 1, metric.WithAttributeSet(attrs))
} else {
counter = heartbeatsFailed
heartbeatsFailed.Add(context.Background(), 1, metric.WithAttributeSet(attrs))
}
_ = stats.RecordWithTags(context.Background(), tagMutators, counter.M(1))

}

func generateHeartbeatLog(hecToOtelAttrs splunk.HecToOtelAttrs, buildInfo component.BuildInfo) plog.Logs {
Expand Down
96 changes: 54 additions & 42 deletions exporter/splunkhecexporter/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"time"

"github.com/stretchr/testify/assert"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/attribute"
metricnoop "go.opentelemetry.io/otel/metric/noop"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

const (
Expand All @@ -36,9 +39,9 @@ func createTestConfig(metricsOverrides map[string]string, enableMetrics bool) *C
return config
}

func initHeartbeater(t *testing.T, metricsOverrides map[string]string, enableMetrics bool, consumeFn func(ctx context.Context, ld plog.Logs) error) {
func initHeartbeater(t *testing.T, metricsOverrides map[string]string, enableMetrics bool, consumeFn func(ctx context.Context, ld plog.Logs) error, mp *sdkmetric.MeterProvider) {
config := createTestConfig(metricsOverrides, enableMetrics)
hbter := newHeartbeater(config, component.NewDefaultBuildInfo(), consumeFn)
hbter := newHeartbeater(config, component.NewDefaultBuildInfo(), consumeFn, mp.Meter("test"))
t.Cleanup(func() {
hbter.shutdown()
})
Expand All @@ -49,42 +52,42 @@ func assertHeartbeatInfoLog(t *testing.T, l plog.Logs) {
assert.Contains(t, l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().AsString(), "HeartbeatInfo")
}

func getMetricValue(metricName string) []float64 {
viewData, _ := view.RetrieveData(metricName)
var ret []float64
if len(viewData) > 0 {
for _, data := range viewData {
ret = append(ret, data.Data.(*view.SumData).Value)
func getMetricValue(reader *sdkmetric.ManualReader, name string) ([]int64, error) {
var md metricdata.ResourceMetrics
err := reader.Collect(context.Background(), &md)
var ret []int64
for _, sm := range md.ScopeMetrics {
for _, m := range sm.Metrics {
if m.Name == name {
g := m.Data.(metricdata.Sum[int64])
ret = append(ret, g.DataPoints[0].Value)
}
}
}
return ret
return ret, err
}

func getTags(metricName string) [][]tag.Tag {
viewData, _ := view.RetrieveData(metricName)
var ret [][]tag.Tag
if len(viewData) > 0 {
for _, data := range viewData {
ret = append(ret, data.Tags)
}
}
return ret
}

func resetMetrics(metricsNames ...string) {
for _, metricsName := range metricsNames {
if v := view.Find(metricsName); v != nil {
view.Unregister(v)
func getAttributes(reader *sdkmetric.ManualReader, name string) ([]attribute.Set, error) {
var md metricdata.ResourceMetrics
err := reader.Collect(context.Background(), &md)
var ret []attribute.Set
for _, sm := range md.ScopeMetrics {
for _, m := range sm.Metrics {
if m.Name == name {
g := m.Data.(metricdata.Sum[int64])
ret = append(ret, g.DataPoints[0].Attributes)
}
}
}
return ret, err
}

func Test_newHeartbeater_disabled(t *testing.T) {
config := createTestConfig(map[string]string{}, false)
config.Heartbeat.Interval = 0
hb := newHeartbeater(config, component.NewDefaultBuildInfo(), func(_ context.Context, _ plog.Logs) error {
return nil
})
}, metricnoop.NewMeterProvider().Meter("test"))
assert.Nil(t, hb)
}

Expand Down Expand Up @@ -115,7 +118,9 @@ func Test_Heartbeat_success(t *testing.T) {
consumeLogsChan <- ld
return nil
}
initHeartbeater(t, tt.metricsOverrides, true, consumeFn)
reader := sdkmetric.NewManualReader()
meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
initHeartbeater(t, tt.metricsOverrides, true, consumeFn, meterProvider)

assert.Eventually(t, func() bool {
return len(consumeLogsChan) != 0
Expand All @@ -126,31 +131,38 @@ func Test_Heartbeat_success(t *testing.T) {

if tt.enableMetrics {
sentMetricsName := getMetricsName(tt.metricsOverrides, defaultHBSentMetricsName)
failedMetricsName := getMetricsName(tt.metricsOverrides, defaultHBFailedMetricsName)

var got []int64
var err error
assert.Eventually(t, func() bool {
return len(getMetricValue(sentMetricsName)) != 0
got, err = getMetricValue(reader, sentMetricsName)
require.NoError(t, err)
return len(got) != 0
}, time.Second, 10*time.Millisecond)
assert.Greater(t, getMetricValue(sentMetricsName)[0], float64(0), "there should be at least one success metric datapoint")
metricLabelKeyTag, _ := tag.NewKey(metricLabelKey)
assert.Equal(t, []tag.Tag{{Key: metricLabelKeyTag, Value: metricLabelVal}}, getTags(sentMetricsName)[0])

resetMetrics(sentMetricsName, failedMetricsName)
assert.Greater(t, got[0], int64(0), "there should be at least one success metric datapoint")
attrs, err := getAttributes(reader, sentMetricsName)
require.NoError(t, err)
assert.Equal(t, attribute.NewSet(attribute.String(metricLabelKey, metricLabelVal)), attrs[0])
}
}
}

func Test_Heartbeat_failure(t *testing.T) {
resetMetrics()
consumeFn := func(_ context.Context, _ plog.Logs) error {
return errors.New("always error")
}
initHeartbeater(t, map[string]string{}, true, consumeFn)
reader := sdkmetric.NewManualReader()
meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
initHeartbeater(t, map[string]string{}, true, consumeFn, meterProvider)

var got []int64
var err error
assert.Eventually(t, func() bool {
return len(getMetricValue(defaultHBFailedMetricsName)) != 0
got, err = getMetricValue(reader, defaultHBFailedMetricsName)
require.NoError(t, err)
return len(got) != 0
}, time.Second, 10*time.Millisecond)
assert.Greater(t, getMetricValue(defaultHBFailedMetricsName)[0], float64(0), "there should be at least one failure metric datapoint")
metricLabelKeyTag, _ := tag.NewKey(metricLabelKey)
assert.Equal(t, []tag.Tag{{Key: metricLabelKeyTag, Value: metricLabelVal}}, getTags(defaultHBFailedMetricsName)[0])
assert.Greater(t, got[0], int64(0), "there should be at least one failure metric datapoint")
attrs, err := getAttributes(reader, defaultHBFailedMetricsName)
require.NoError(t, err)
assert.Equal(t, attribute.NewSet(attribute.String(metricLabelKey, metricLabelVal)), attrs[0])
}
1 change: 0 additions & 1 deletion receiver/splunkhecreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ require (
github.com/prometheus/common v0.54.0 // indirect
github.com/prometheus/procfs v0.15.0 // indirect
github.com/rs/cors v1.11.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector v0.104.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.104.0 // indirect
go.opentelemetry.io/collector/config/configcompression v1.11.0 // indirect
Expand Down
Loading