Skip to content

Commit

Permalink
chore: Switch to using OTel SDK for throughputmeasurement processor (#…
Browse files Browse the repository at this point in the history
…1665)

* move to otel for snapshot, remove throughput wrapper

* fix recording metrics

* fix factories test

* add tests for throughput measurement processor

* add license
  • Loading branch information
BinaryFissionGames authored Jun 5, 2024
1 parent 0feb4e2 commit e503859
Show file tree
Hide file tree
Showing 23 changed files with 3,277 additions and 831 deletions.
5 changes: 0 additions & 5 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ func (c *collector) Run(ctx context.Context) error {
return errors.New("service already running")
}

// Register component telemetry before running to ensure a clean state for this run
if err := factories.RegisterComponentTelemetry(); err != nil {
return fmt.Errorf("register component telemetry: %w", err)
}

// The OT collector only supports using settings once during the lifetime
// of a single collector instance. We must remake the settings on each startup.
settings, err := NewSettings(c.configPaths, c.version, c.loggingOpts, c.factories)
Expand Down
34 changes: 1 addition & 33 deletions factories/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
package factories

import (
"fmt"

"github.com/observiq/bindplane-agent/internal/throughputwrapper"
"github.com/observiq/bindplane-agent/processor/throughputmeasurementprocessor"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
Expand All @@ -34,32 +30,14 @@ func DefaultFactories() (otelcol.Factories, error) {
return combineFactories(defaultReceivers, defaultProcessors, defaultExporters, defaultExtensions, defaultConnectors)
}

// RegisterComponentTelemetry registers or re-registers components with telemetry so that any stale metrics are cleaned out.
func RegisterComponentTelemetry() error {
if err := throughputmeasurementprocessor.RegisterMetricViews(); err != nil {
return fmt.Errorf("failed to register throughput measurement processor telemetry: %w", err)
}

if err := throughputwrapper.RegisterMetricViews(); err != nil {
return fmt.Errorf("failed to register throughput wrapper telemetry: %w", err)
}

return nil
}

// combineFactories combines the supplied factories into a single Factories struct.
// Any errors encountered will also be combined into a single error.
func combineFactories(receivers []receiver.Factory, processors []processor.Factory,
exporters []exporter.Factory, extensions []extension.Factory,
connectors []connector.Factory) (otelcol.Factories, error) {
var errs []error

// Ensure component telemetry is registered at least once by having it in this method
if err := RegisterComponentTelemetry(); err != nil {
errs = append(errs, err)
}

receiverMap, err := receiver.MakeFactoryMap(wrapReceivers(receivers)...)
receiverMap, err := receiver.MakeFactoryMap(receivers...)
if err != nil {
errs = append(errs, err)
}
Expand Down Expand Up @@ -92,13 +70,3 @@ func combineFactories(receivers []receiver.Factory, processors []processor.Facto
Connectors: connectorMap,
}, multierr.Combine(errs...)
}

func wrapReceivers(receivers []receiver.Factory) []receiver.Factory {
wrappedReceivers := make([]receiver.Factory, len(defaultReceivers))

for i, recv := range receivers {
wrappedReceivers[i] = throughputwrapper.WrapReceiverFactory(recv)
}

return wrappedReceivers
}
16 changes: 3 additions & 13 deletions factories/factories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ func TestCombineFactories(t *testing.T) {
assert.NoError(t, err)

for _, receiver := range tc.receivers {
// Due to wrapping of receivers we can't compare them like we can other components.
// We compare actual type and the default config against expected to ensure a match
assertReceiverFactory(t, factories.Receivers[receiver.Type()], receiver)
assert.Equal(t, factories.Receivers[receiver.Type()], receiver)
}

for _, processor := range tc.processors {
Expand All @@ -115,10 +113,8 @@ func TestDefaultFactories(t *testing.T) {
factories, err := DefaultFactories()
assert.NoError(t, err)

for _, receiver := range wrapReceivers(defaultReceivers) {
// Due to wrapping of receivers we can't compare them like we can other components.
// We compare actual type and the default config against expected to ensure a match
assertReceiverFactory(t, factories.Receivers[receiver.Type()], receiver)
for _, receiver := range defaultReceivers {
assert.Equal(t, factories.Receivers[receiver.Type()], receiver)
}

for _, processor := range defaultProcessors {
Expand All @@ -133,9 +129,3 @@ func TestDefaultFactories(t *testing.T) {
assert.Equal(t, factories.Extensions[extension.Type()], extension)
}
}

func assertReceiverFactory(t *testing.T, actual, expected receiver.Factory) {
t.Helper()
assert.Equal(t, actual.Type(), expected.Type())
assert.Equal(t, actual.CreateDefaultConfig(), expected.CreateDefaultConfig())
}
60 changes: 0 additions & 60 deletions internal/throughputwrapper/log_consumer.go

This file was deleted.

61 changes: 0 additions & 61 deletions internal/throughputwrapper/log_consumer_test.go

This file was deleted.

61 changes: 0 additions & 61 deletions internal/throughputwrapper/metric_consumer.go

This file was deleted.

63 changes: 0 additions & 63 deletions internal/throughputwrapper/metric_consumer_test.go

This file was deleted.

Loading

0 comments on commit e503859

Please sign in to comment.