Skip to content

Commit

Permalink
[chore] alternative for splunkhec to use otel
Browse files Browse the repository at this point in the history
This is an alternative to open-telemetry#33664, since mdatagen doesn't support changing metric names, this substitutes the opencensus sdk calls with otel directly.

Signed-off-by: Alex Boten <[email protected]>
  • Loading branch information
codeboten committed Jul 4, 2024
1 parent fa987c3 commit f19697c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 44 deletions.
6 changes: 5 additions & 1 deletion exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)

Expand Down Expand Up @@ -53,6 +55,7 @@ type client struct {
heartbeater *heartbeater
bufferPool bufferPool
exporterName string
meter metric.Meter
}

var jsonStreamPool = sync.Pool{
Expand All @@ -69,6 +72,7 @@ func newClient(set exporter.Settings, cfg *Config, maxContentLength uint) *clien
buildInfo: set.BuildInfo,
bufferPool: newBufferPool(maxContentLength, !cfg.DisableCompression),
exporterName: set.ID.String(),
meter: metadata.Meter(set.TelemetrySettings),
}
}

Expand Down Expand Up @@ -632,7 +636,7 @@ func (c *client) start(ctx context.Context, host component.Host) (err error) {
}
url, _ := c.config.getURL()
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(c.config, c.buildInfo), c.logger}
c.heartbeater = newHeartbeater(c.config, c.buildInfo, getPushLogFn(c))
c.heartbeater = newHeartbeater(c.config, c.buildInfo, getPushLogFn(c), c.meter)
if c.config.Heartbeat.Startup {
if err := c.heartbeater.sendHeartbeat(c.config, c.buildInfo, getPushLogFn(c)); err != nil {
return fmt.Errorf("%s: heartbeat on startup failed: %w", c.exporterName, err)
Expand Down
66 changes: 25 additions & 41 deletions exporter/splunkhecexporter/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import (
"runtime"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)
Expand All @@ -36,52 +35,38 @@ func getMetricsName(overrides map[string]string, metricName string) string {
return metricName
}

func newHeartbeater(config *Config, buildInfo component.BuildInfo, pushLogFn func(ctx context.Context, ld plog.Logs) error) *heartbeater {
func newHeartbeater(config *Config, buildInfo component.BuildInfo, pushLogFn func(ctx context.Context, ld plog.Logs) error, meter metric.Meter) *heartbeater {
interval := config.Heartbeat.Interval
if interval == 0 {
return nil
}

var heartbeatsSent, heartbeatsFailed *stats.Int64Measure
var tagMutators []tag.Mutator
var heartbeatsSent, heartbeatsFailed metric.Int64Counter
var attrs attribute.Set
if config.Telemetry.Enabled {
overrides := config.Telemetry.OverrideMetricsNames
extraAttributes := config.Telemetry.ExtraAttributes
var tags []tag.Key
tagMutators = []tag.Mutator{}
var tags []attribute.KeyValue
for key, val := range extraAttributes {
newTag, _ := tag.NewKey(key)
tags = append(tags, newTag)
tagMutators = append(tagMutators, tag.Insert(newTag, val))
tags = append(tags, attribute.String(key, val))
}

heartbeatsSent = stats.Int64(
attrs = attribute.NewSet(tags...)
var err error
heartbeatsSent, err = meter.Int64Counter(
getMetricsName(overrides, defaultHBSentMetricsName),
"number of heartbeats sent",
stats.UnitDimensionless)

heartbeatsSentView := &view.View{
Name: heartbeatsSent.Name(),
Description: heartbeatsSent.Description(),
TagKeys: tags,
Measure: heartbeatsSent,
Aggregation: view.Sum(),
metric.WithDescription("number of heartbeats sent"),
metric.WithUnit("1"),
)
if err != nil {
return nil
}

heartbeatsFailed = stats.Int64(
heartbeatsFailed, err = meter.Int64Counter(
getMetricsName(overrides, defaultHBFailedMetricsName),
"number of heartbeats failed",
stats.UnitDimensionless)

heartbeatsFailedView := &view.View{
Name: heartbeatsFailed.Name(),
Description: heartbeatsFailed.Description(),
TagKeys: tags,
Measure: heartbeatsFailed,
Aggregation: view.Sum(),
}

if err := view.Register(heartbeatsSentView, heartbeatsFailedView); err != nil {
metric.WithDescription("number of heartbeats failed"),
metric.WithUnit("1"),
)
if err != nil {
return nil
}
}
Expand All @@ -99,7 +84,7 @@ func newHeartbeater(config *Config, buildInfo component.BuildInfo, pushLogFn fun
case <-ticker.C:
err := hbter.sendHeartbeat(config, buildInfo, pushLogFn)
if config.Telemetry.Enabled {
observe(heartbeatsSent, heartbeatsFailed, tagMutators, err)
observe(heartbeatsSent, heartbeatsFailed, attrs, err)
}
}
}
Expand All @@ -116,14 +101,13 @@ func (h *heartbeater) sendHeartbeat(config *Config, buildInfo component.BuildInf
}

// there is only use case for open census metrics recording for now. Extend to use open telemetry in the future.
func observe(heartbeatsSent *stats.Int64Measure, heartbeatsFailed *stats.Int64Measure, tagMutators []tag.Mutator, err error) {
var counter *stats.Int64Measure
func observe(heartbeatsSent, heartbeatsFailed metric.Int64Counter, attrs attribute.Set, err error) {
if err == nil {
counter = heartbeatsSent
heartbeatsSent.Add(context.Background(), 1, metric.WithAttributeSet(attrs))
} else {
counter = heartbeatsFailed
heartbeatsFailed.Add(context.Background(), 1, metric.WithAttributeSet(attrs))
}
_ = stats.RecordWithTags(context.Background(), tagMutators, counter.M(1))

}

func generateHeartbeatLog(hecToOtelAttrs splunk.HecToOtelAttrs, buildInfo component.BuildInfo) plog.Logs {
Expand Down
5 changes: 3 additions & 2 deletions exporter/splunkhecexporter/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
metricnoop "go.opentelemetry.io/otel/metric/noop"
)

const (
Expand All @@ -38,7 +39,7 @@ func createTestConfig(metricsOverrides map[string]string, enableMetrics bool) *C

func initHeartbeater(t *testing.T, metricsOverrides map[string]string, enableMetrics bool, consumeFn func(ctx context.Context, ld plog.Logs) error) {
config := createTestConfig(metricsOverrides, enableMetrics)
hbter := newHeartbeater(config, component.NewDefaultBuildInfo(), consumeFn)
hbter := newHeartbeater(config, component.NewDefaultBuildInfo(), consumeFn, metricnoop.NewMeterProvider().Meter("test"))
t.Cleanup(func() {
hbter.shutdown()
})
Expand Down Expand Up @@ -84,7 +85,7 @@ func Test_newHeartbeater_disabled(t *testing.T) {
config.Heartbeat.Interval = 0
hb := newHeartbeater(config, component.NewDefaultBuildInfo(), func(_ context.Context, _ plog.Logs) error {
return nil
})
}, metricnoop.NewMeterProvider().Meter("test"))
assert.Nil(t, hb)
}

Expand Down

0 comments on commit f19697c

Please sign in to comment.