diff --git a/internal/fanoutconsumer/logs.go b/internal/fanoutconsumer/logs.go index cc481aed6f3..e0f9f88df4c 100644 --- a/internal/fanoutconsumer/logs.go +++ b/internal/fanoutconsumer/logs.go @@ -46,18 +46,19 @@ func (lsc *logsConsumer) Capabilities() consumer.Capabilities { func (lsc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { var errs error - // Clone the data before sending to mutable consumers. - // The only exception is the last consumer which is allowed to mutate the data only if there are no - // other non-mutating consumers and the data is mutable. Never share the same data between - // a mutating and a non-mutating consumer since the non-mutating consumer may process - // data async and the mutating consumer may change the data before that. - for i, lc := range lsc.mutable { - if i < len(lsc.mutable)-1 || ld.IsReadOnly() || len(lsc.readonly) > 0 { - clonedLogs := plog.NewLogs() - ld.CopyTo(clonedLogs) - errs = multierr.Append(errs, lc.ConsumeLogs(ctx, clonedLogs)) + if len(lsc.mutable) > 0 { + // Clone the data before sending to all mutating consumers except the last one. + for i := 0; i < len(lsc.mutable)-1; i++ { + errs = multierr.Append(errs, lsc.mutable[i].ConsumeLogs(ctx, cloneLogs(ld))) + } + // Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the + // data is mutable. Never share the same data between a mutating and a non-mutating consumer since the + // non-mutating consumer may process data async and the mutating consumer may change the data before that. + lastConsumer := lsc.mutable[len(lsc.mutable)-1] + if len(lsc.readonly) == 0 && !ld.IsReadOnly() { + errs = multierr.Append(errs, lastConsumer.ConsumeLogs(ctx, ld)) } else { - errs = multierr.Append(errs, lc.ConsumeLogs(ctx, ld)) + errs = multierr.Append(errs, lastConsumer.ConsumeLogs(ctx, cloneLogs(ld))) } } @@ -72,6 +73,12 @@ func (lsc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return errs } +func cloneLogs(ld plog.Logs) plog.Logs { + clonedLogs := plog.NewLogs() + ld.CopyTo(clonedLogs) + return clonedLogs +} + var _ connector.LogsRouter = (*logsRouter)(nil) type logsRouter struct { diff --git a/internal/fanoutconsumer/metrics.go b/internal/fanoutconsumer/metrics.go index ac83c1bfe28..13ea0efe0cf 100644 --- a/internal/fanoutconsumer/metrics.go +++ b/internal/fanoutconsumer/metrics.go @@ -44,18 +44,19 @@ func (msc *metricsConsumer) Capabilities() consumer.Capabilities { func (msc *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { var errs error - // Clone the data before sending to mutable consumers. - // The only exception is the last consumer which is allowed to mutate the data only if there are no - // other non-mutating consumers and the data is mutable. Never share the same data between - // a mutating and a non-mutating consumer since the non-mutating consumer may process - // data async and the mutating consumer may change the data before that. - for i, mc := range msc.mutable { - if i < len(msc.mutable)-1 || md.IsReadOnly() || len(msc.readonly) > 0 { - clonedMetrics := pmetric.NewMetrics() - md.CopyTo(clonedMetrics) - errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, clonedMetrics)) + if len(msc.mutable) > 0 { + // Clone the data before sending to all mutating consumers except the last one. + for i := 0; i < len(msc.mutable)-1; i++ { + errs = multierr.Append(errs, msc.mutable[i].ConsumeMetrics(ctx, cloneMetrics(md))) + } + // Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the + // data is mutable. Never share the same data between a mutating and a non-mutating consumer since the + // non-mutating consumer may process data async and the mutating consumer may change the data before that. + lastConsumer := msc.mutable[len(msc.mutable)-1] + if len(msc.readonly) == 0 && !md.IsReadOnly() { + errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, md)) } else { - errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md)) + errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, cloneMetrics(md))) } } @@ -70,6 +71,12 @@ func (msc *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metri return errs } +func cloneMetrics(md pmetric.Metrics) pmetric.Metrics { + clonedMetrics := pmetric.NewMetrics() + md.CopyTo(clonedMetrics) + return clonedMetrics +} + var _ connector.MetricsRouter = (*metricsRouter)(nil) type metricsRouter struct { diff --git a/internal/fanoutconsumer/traces.go b/internal/fanoutconsumer/traces.go index 20fcfc90131..bb6c30ae84e 100644 --- a/internal/fanoutconsumer/traces.go +++ b/internal/fanoutconsumer/traces.go @@ -44,18 +44,19 @@ func (tsc *tracesConsumer) Capabilities() consumer.Capabilities { func (tsc *tracesConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { var errs error - // Clone the data before sending to mutable consumers. - // The only exception is the last consumer which is allowed to mutate the data only if there are no - // other non-mutating consumers and the data is mutable. Never share the same data between - // a mutating and a non-mutating consumer since the non-mutating consumer may process - // data async and the mutating consumer may change the data before that. - for i, tc := range tsc.mutable { - if i < len(tsc.mutable)-1 || td.IsReadOnly() || len(tsc.readonly) > 0 { - clonedTraces := ptrace.NewTraces() - td.CopyTo(clonedTraces) - errs = multierr.Append(errs, tc.ConsumeTraces(ctx, clonedTraces)) + if len(tsc.mutable) > 0 { + // Clone the data before sending to all mutating consumers except the last one. + for i := 0; i < len(tsc.mutable)-1; i++ { + errs = multierr.Append(errs, tsc.mutable[i].ConsumeTraces(ctx, cloneTraces(td))) + } + // Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the + // data is mutable. Never share the same data between a mutating and a non-mutating consumer since the + // non-mutating consumer may process data async and the mutating consumer may change the data before that. + lastConsumer := tsc.mutable[len(tsc.mutable)-1] + if len(tsc.readonly) == 0 && !td.IsReadOnly() { + errs = multierr.Append(errs, lastConsumer.ConsumeTraces(ctx, td)) } else { - errs = multierr.Append(errs, tc.ConsumeTraces(ctx, td)) + errs = multierr.Append(errs, lastConsumer.ConsumeTraces(ctx, cloneTraces(td))) } } @@ -70,6 +71,12 @@ func (tsc *tracesConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) return errs } +func cloneTraces(td ptrace.Traces) ptrace.Traces { + clonedTraces := ptrace.NewTraces() + td.CopyTo(clonedTraces) + return clonedTraces +} + var _ connector.TracesRouter = (*tracesRouter)(nil) type tracesRouter struct {