Skip to content

Commit

Permalink
Add ingoing and outgoing counts to processorhelper
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Aug 19, 2024
1 parent d2ed276 commit 259cf00
Show file tree
Hide file tree
Showing 10 changed files with 433 additions and 3 deletions.
48 changes: 48 additions & 0 deletions processor/processorhelper/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,30 @@ Number of spans that were dropped.
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### otelcol_processor_incoming_log_records

Number of log records passed to the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {records} | Sum | Int | true |

### otelcol_processor_incoming_metric_points

Number of metric points passed to the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {datapoints} | Sum | Int | true |

### otelcol_processor_incoming_spans

Number of spans passed to the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### otelcol_processor_inserted_log_records

Number of log records that were inserted.
Expand All @@ -78,6 +102,30 @@ Number of spans that were inserted.
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### otelcol_processor_outgoing_log_records

Number of log records emitted from the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {records} | Sum | Int | true |

### otelcol_processor_outgoing_metric_points

Number of metric points emitted from the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {datapoints} | Sum | Int | true |

### otelcol_processor_outgoing_spans

Number of spans emitted from the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### otelcol_processor_refused_log_records

Number of log records that were rejected by the next component in the pipeline.
Expand Down
42 changes: 42 additions & 0 deletions processor/processorhelper/internal/metadata/generated_telemetry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion processor/processorhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"

"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -39,12 +40,25 @@ func NewLogsProcessor(
return nil, errors.New("nil logsFunc")
}

if set.MeterProvider == nil {
set.MeterProvider = noop.NewMeterProvider()

Check warning on line 44 in processor/processorhelper/logs.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/logs.go#L44

Added line #L44 was not covered by tests
}

obs, err := newObsReport(ObsReportSettings{
ProcessorID: set.ID,
ProcessorCreateSettings: set,
})
if err != nil {
return nil, err

Check warning on line 52 in processor/processorhelper/logs.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/logs.go#L52

Added line #L52 was not covered by tests
}

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
recordsIn := ld.LogRecordCount()

ld, err = logsFunc(ctx, ld)
span.AddEvent("End processing.", eventOptions)
if err != nil {
Expand All @@ -53,6 +67,8 @@ func NewLogsProcessor(
}
return err
}
recordsOut := ld.LogRecordCount()
obs.recordInOut(ctx, component.DataTypeLogs, recordsIn, recordsOut)
return nextConsumer.ConsumeLogs(ctx, ld)
}, bs.consumerOptions...)
if err != nil {
Expand Down
75 changes: 75 additions & 0 deletions processor/processorhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ package processorhelper
import (
"context"
"errors"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -67,3 +73,72 @@ func newTestLProcessor(retError error) ProcessLogsFunc {
return ld, retError
}
}

func TestLogsProcessor_RecordInOut(t *testing.T) {
// Regardless of how many logs are ingested, emit just one
mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) {
ld := plog.NewLogs()
ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
return ld, nil
}

incomingLogs := plog.NewLogs()
incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()

// Add 3 records to the incoming
incomingLogRecords.AppendEmpty()
incomingLogRecords.AppendEmpty()
incomingLogRecords.AppendEmpty()

metricReader := sdkmetric.NewManualReader()
set := processortest.NewNopSettings()
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal
set.TelemetrySettings.MeterProvider = sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader))

lp, err := NewLogsProcessor(context.Background(), set, &testLogsCfg, consumertest.NewNop(), mockAggregate)
require.NoError(t, err)

assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs))
assert.NoError(t, lp.Shutdown(context.Background()))

ownMetrics := new(metricdata.ResourceMetrics)
require.NoError(t, metricReader.Collect(context.Background(), ownMetrics))

require.Len(t, ownMetrics.ScopeMetrics, 1)
require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2)

inMetric := ownMetrics.ScopeMetrics[0].Metrics[0]
outMetric := ownMetrics.ScopeMetrics[0].Metrics[1]
if strings.Contains(inMetric.Name, "outgoing") {
inMetric, outMetric = outMetric, inMetric
}

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 3,
},
},
}, inMetric.Data, metricdatatest.IgnoreTimestamp())

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 1,
},
},
}, outMetric.Data, metricdatatest.IgnoreTimestamp())
}
49 changes: 49 additions & 0 deletions processor/processorhelper/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,55 @@ status:

telemetry:
metrics:

processor_incoming_spans:
enabled: true
description: Number of spans passed to the processor.
unit: "{spans}"
sum:
value_type: int
monotonic: true

processor_outgoing_spans:
enabled: true
description: Number of spans emitted from the processor.
unit: "{spans}"
sum:
value_type: int
monotonic: true

processor_incoming_metric_points:
enabled: true
description: Number of metric points passed to the processor.
unit: "{datapoints}"
sum:
value_type: int
monotonic: true

processor_outgoing_metric_points:
enabled: true
description: Number of metric points emitted from the processor.
unit: "{datapoints}"
sum:
value_type: int
monotonic: true

processor_incoming_log_records:
enabled: true
description: Number of log records passed to the processor.
unit: "{records}"
sum:
value_type: int
monotonic: true

processor_outgoing_log_records:
enabled: true
description: Number of log records emitted from the processor.
unit: "{records}"
sum:
value_type: int
monotonic: true

processor_accepted_spans:
enabled: true
description: Number of spans successfully pushed into the next component in the pipeline.
Expand Down
18 changes: 17 additions & 1 deletion processor/processorhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"

"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -39,12 +40,25 @@ func NewMetricsProcessor(
return nil, errors.New("nil metricsFunc")
}

if set.MeterProvider == nil {
set.MeterProvider = noop.NewMeterProvider()

Check warning on line 44 in processor/processorhelper/metrics.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/metrics.go#L44

Added line #L44 was not covered by tests
}

obs, err := newObsReport(ObsReportSettings{
ProcessorID: set.ID,
ProcessorCreateSettings: set,
})
if err != nil {
return nil, err

Check warning on line 52 in processor/processorhelper/metrics.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/metrics.go#L52

Added line #L52 was not covered by tests
}

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
pointsIn := md.DataPointCount()

md, err = metricsFunc(ctx, md)
span.AddEvent("End processing.", eventOptions)
if err != nil {
Expand All @@ -53,6 +67,8 @@ func NewMetricsProcessor(
}
return err
}
pointsOut := md.DataPointCount()
obs.recordInOut(ctx, component.DataTypeMetrics, pointsIn, pointsOut)
return nextConsumer.ConsumeMetrics(ctx, md)
}, bs.consumerOptions...)
if err != nil {
Expand Down
Loading

0 comments on commit 259cf00

Please sign in to comment.