Skip to content

Commit

Permalink
[chore] Fix usage of setupTestTelemetry in filter processor (#36393)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Nov 18, 2024
1 parent ade3e4b commit 2c7daec
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 201 deletions.
2 changes: 1 addition & 1 deletion processor/filterprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
go.opentelemetry.io/collector/consumer v0.113.1-0.20241115165626-8b99b8023ca3
go.opentelemetry.io/collector/consumer/consumertest v0.113.1-0.20241115165626-8b99b8023ca3
go.opentelemetry.io/collector/pdata v1.19.1-0.20241115165626-8b99b8023ca3
go.opentelemetry.io/collector/pipeline v0.113.1-0.20241115165626-8b99b8023ca3
go.opentelemetry.io/collector/processor v0.113.1-0.20241115165626-8b99b8023ca3
go.opentelemetry.io/collector/processor/processortest v0.113.1-0.20241115165626-8b99b8023ca3
go.opentelemetry.io/otel v1.32.0
Expand Down Expand Up @@ -64,7 +65,6 @@ require (
go.opentelemetry.io/collector/featuregate v1.19.1-0.20241115165626-8b99b8023ca3 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.113.1-0.20241115165626-8b99b8023ca3 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.113.1-0.20241115165626-8b99b8023ca3 // indirect
go.opentelemetry.io/collector/pipeline v0.113.1-0.20241115165626-8b99b8023ca3 // indirect
go.opentelemetry.io/collector/processor/processorprofiles v0.113.1-0.20241115165626-8b99b8023ca3 // indirect
go.opentelemetry.io/collector/semconv v0.113.1-0.20241115165626-8b99b8023ca3 // indirect
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
Expand Down
7 changes: 4 additions & 3 deletions processor/filterprocessor/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/multierr"
Expand All @@ -22,7 +23,7 @@ import (

type filterLogProcessor struct {
skipExpr expr.BoolExpr[ottllog.TransformContext]
telemetry *filterProcessorTelemetry
telemetry *filterTelemetry
logger *zap.Logger
}

Expand All @@ -31,7 +32,7 @@ func newFilterLogsProcessor(set processor.Settings, cfg *Config) (*filterLogProc
logger: set.Logger,
}

fpt, err := newfilterProcessorTelemetry(set)
fpt, err := newFilterTelemetry(set, pipeline.SignalLogs)
if err != nil {
return nil, fmt.Errorf("error creating filter processor telemetry: %w", err)
}
Expand Down Expand Up @@ -92,7 +93,7 @@ func (flp *filterLogProcessor) processLogs(ctx context.Context, ld plog.Logs) (p
})

logCountAfterFilters := ld.LogRecordCount()
flp.telemetry.record(triggerLogsDropped, int64(logCountBeforeFilters-logCountAfterFilters))
flp.telemetry.record(ctx, int64(logCountBeforeFilters-logCountAfterFilters))

if errors != nil {
flp.logger.Error("failed processing logs", zap.Error(errors))
Expand Down
1 change: 1 addition & 0 deletions processor/filterprocessor/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ func TestFilterLogProcessorTelemetry(t *testing.T) {
}

tel.assertMetrics(t, want)
require.NoError(t, tel.Shutdown(context.Background()))
}

func constructLogs() plog.Logs {
Expand Down
7 changes: 4 additions & 3 deletions processor/filterprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/multierr"
Expand All @@ -29,7 +30,7 @@ type filterMetricProcessor struct {
skipResourceExpr expr.BoolExpr[ottlresource.TransformContext]
skipMetricExpr expr.BoolExpr[ottlmetric.TransformContext]
skipDataPointExpr expr.BoolExpr[ottldatapoint.TransformContext]
telemetry *filterProcessorTelemetry
telemetry *filterTelemetry
logger *zap.Logger
}

Expand All @@ -39,7 +40,7 @@ func newFilterMetricProcessor(set processor.Settings, cfg *Config) (*filterMetri
logger: set.Logger,
}

fpt, err := newfilterProcessorTelemetry(set)
fpt, err := newFilterTelemetry(set, pipeline.SignalMetrics)
if err != nil {
return nil, fmt.Errorf("error creating filter processor telemetry: %w", err)
}
Expand Down Expand Up @@ -173,7 +174,7 @@ func (fmp *filterMetricProcessor) processMetrics(ctx context.Context, md pmetric
})

metricDataPointCountAfterFilters := md.DataPointCount()
fmp.telemetry.record(triggerMetricDataPointsDropped, int64(metricDataPointCountBeforeFilters-metricDataPointCountAfterFilters))
fmp.telemetry.record(ctx, int64(metricDataPointCountBeforeFilters-metricDataPointCountAfterFilters))

if errors != nil {
fmp.logger.Error("failed processing metrics", zap.Error(errors))
Expand Down
167 changes: 4 additions & 163 deletions processor/filterprocessor/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,90 +369,21 @@ func TestFilterMetricProcessor(t *testing.T) {

func TestFilterMetricProcessorTelemetry(t *testing.T) {
tel := setupTestTelemetry()
next := new(consumertest.MetricsSink)
cfg := &Config{
Metrics: MetricFilters{
MetricConditions: []string{
"name==\"metric1\"",
},
},
}
factory := NewFactory()
fmp, err := factory.CreateMetrics(
context.Background(),
fmp, err := newFilterMetricProcessor(
tel.NewSettings(),
cfg,
next,
)
assert.NotNil(t, fmp)
assert.NoError(t, err)

caps := fmp.Capabilities()
assert.True(t, caps.MutatesData)
ctx := context.Background()
assert.NoError(t, fmp.Start(ctx, nil))

err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{
{
metricNames: []string{"foo", "bar"},
resourceAttributes: map[string]any{
"attr1": "attr1/val1",
},
},
}))
assert.NoError(t, err)

want := []metricdata.Metrics{
{
Name: "otelcol_processor_filter_datapoints.filtered",
Description: "Number of metric data points dropped by the filter processor",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 0,
Attributes: attribute.NewSet(attribute.String("filter", "filter")),
},
},
},
},
{
Name: "otelcol_processor_incoming_items",
Description: "Number of items passed to the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 2,
Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")),
},
},
},
},
{
Name: "otelcol_processor_outgoing_items",
Description: "Number of items emitted from the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 2,
Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")),
},
},
},
},
}

tel.assertMetrics(t, want)

err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{
_, err = fmp.processMetrics(context.Background(), testResourceMetrics([]metricWithResource{
{
metricNames: []string{"metric1", "metric2"},
resourceAttributes: map[string]any{
Expand All @@ -462,7 +393,7 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) {
}))
assert.NoError(t, err)

want = []metricdata.Metrics{
want := []metricdata.Metrics{
{
Name: "otelcol_processor_filter_datapoints.filtered",
Description: "Number of metric data points dropped by the filter processor",
Expand All @@ -478,99 +409,9 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) {
},
},
},
{
Name: "otelcol_processor_incoming_items",
Description: "Number of items passed to the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 4,
Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")),
},
},
},
},
{
Name: "otelcol_processor_outgoing_items",
Description: "Number of items emitted from the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 3,
Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")),
},
},
},
},
}
tel.assertMetrics(t, want)

err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{
{
metricNames: []string{"metric1"},
resourceAttributes: map[string]any{
"attr1": "attr1/val1",
},
},
}))
assert.NoError(t, err)

want = []metricdata.Metrics{
{
Name: "otelcol_processor_filter_datapoints.filtered",
Description: "Number of metric data points dropped by the filter processor",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 2,
Attributes: attribute.NewSet(attribute.String("filter", "filter")),
},
},
},
},
{
Name: "otelcol_processor_incoming_items",
Description: "Number of items passed to the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 5,
Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")),
},
},
},
},
{
Name: "otelcol_processor_outgoing_items",
Description: "Number of items emitted from the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 3,
Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")),
},
},
},
},
}
tel.assertMetrics(t, want)

assert.NoError(t, fmp.Shutdown(ctx))
require.NoError(t, tel.Shutdown(context.Background()))
}

func testResourceMetrics(mwrs []metricWithResource) pmetric.Metrics {
Expand Down
51 changes: 23 additions & 28 deletions processor/filterprocessor/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,45 @@ package filterprocessor // import "github.com/open-telemetry/opentelemetry-colle

import (
"context"
"fmt"

"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor/internal/metadata"
)

type trigger int

const (
triggerMetricDataPointsDropped trigger = iota
triggerLogsDropped
triggerSpansDropped
)

type filterProcessorTelemetry struct {
exportCtx context.Context

processorAttr []attribute.KeyValue

telemetryBuilder *metadata.TelemetryBuilder
type filterTelemetry struct {
attr metric.MeasurementOption
counter metric.Int64Counter
}

func newfilterProcessorTelemetry(set processor.Settings) (*filterProcessorTelemetry, error) {
func newFilterTelemetry(set processor.Settings, signal pipeline.Signal) (*filterTelemetry, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
if err != nil {
return nil, err
}

return &filterProcessorTelemetry{
processorAttr: []attribute.KeyValue{attribute.String(metadata.Type.String(), set.ID.String())},
exportCtx: context.Background(),
telemetryBuilder: telemetryBuilder,
var counter metric.Int64Counter
switch signal {
case pipeline.SignalMetrics:
counter = telemetryBuilder.ProcessorFilterDatapointsFiltered
case pipeline.SignalLogs:
counter = telemetryBuilder.ProcessorFilterLogsFiltered
case pipeline.SignalTraces:
counter = telemetryBuilder.ProcessorFilterSpansFiltered
default:
return nil, fmt.Errorf("unsupported signal type: %v", signal)
}

return &filterTelemetry{
attr: metric.WithAttributeSet(attribute.NewSet(attribute.String(metadata.Type.String(), set.ID.String()))),
counter: counter,
}, nil
}

func (fpt *filterProcessorTelemetry) record(trigger trigger, dropped int64) {
switch trigger {
case triggerMetricDataPointsDropped:
fpt.telemetryBuilder.ProcessorFilterDatapointsFiltered.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...))
case triggerLogsDropped:
fpt.telemetryBuilder.ProcessorFilterLogsFiltered.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...))
case triggerSpansDropped:
fpt.telemetryBuilder.ProcessorFilterSpansFiltered.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...))
}
func (fpt *filterTelemetry) record(ctx context.Context, dropped int64) {
fpt.counter.Add(ctx, dropped, fpt.attr)
}
Loading

0 comments on commit 2c7daec

Please sign in to comment.