Skip to content

Commit

Permalink
Try making the splitting logic cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Oct 16, 2023
1 parent e447ac8 commit 6d9cb0f
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 33 deletions.
29 changes: 18 additions & 11 deletions internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,19 @@ func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
func (lsc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var errs error

// Clone the data before sending to mutable consumers.
// The only exception is the last consumer which is allowed to mutate the data only if there are no
// other non-mutating consumers and the data is mutable. Never share the same data between
// a mutating and a non-mutating consumer since the non-mutating consumer may process
// data async and the mutating consumer may change the data before that.
for i, lc := range lsc.mutable {
if i < len(lsc.mutable)-1 || ld.IsReadOnly() || len(lsc.readonly) > 0 {
clonedLogs := plog.NewLogs()
ld.CopyTo(clonedLogs)
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, clonedLogs))
if len(lsc.mutable) > 0 {
// Clone the data before sending to all mutating consumers except the last one.
for i := 0; i < len(lsc.mutable)-1; i++ {
errs = multierr.Append(errs, lsc.mutable[i].ConsumeLogs(ctx, cloneLogs(ld)))
}
// Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the
// data is mutable. Never share the same data between a mutating and a non-mutating consumer since the
// non-mutating consumer may process data async and the mutating consumer may change the data before that.
lastConsumer := lsc.mutable[len(lsc.mutable)-1]
if len(lsc.readonly) == 0 && !ld.IsReadOnly() {
errs = multierr.Append(errs, lastConsumer.ConsumeLogs(ctx, ld))
} else {
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, ld))
errs = multierr.Append(errs, lastConsumer.ConsumeLogs(ctx, cloneLogs(ld)))
}
}

Expand All @@ -72,6 +73,12 @@ func (lsc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return errs
}

func cloneLogs(ld plog.Logs) plog.Logs {
clonedLogs := plog.NewLogs()
ld.CopyTo(clonedLogs)
return clonedLogs
}

var _ connector.LogsRouter = (*logsRouter)(nil)

type logsRouter struct {
Expand Down
29 changes: 18 additions & 11 deletions internal/fanoutconsumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,19 @@ func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
func (msc *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
var errs error

// Clone the data before sending to mutable consumers.
// The only exception is the last consumer which is allowed to mutate the data only if there are no
// other non-mutating consumers and the data is mutable. Never share the same data between
// a mutating and a non-mutating consumer since the non-mutating consumer may process
// data async and the mutating consumer may change the data before that.
for i, mc := range msc.mutable {
if i < len(msc.mutable)-1 || md.IsReadOnly() || len(msc.readonly) > 0 {
clonedMetrics := pmetric.NewMetrics()
md.CopyTo(clonedMetrics)
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, clonedMetrics))
if len(msc.mutable) > 0 {
// Clone the data before sending to all mutating consumers except the last one.
for i := 0; i < len(msc.mutable)-1; i++ {
errs = multierr.Append(errs, msc.mutable[i].ConsumeMetrics(ctx, cloneMetrics(md)))
}
// Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the
// data is mutable. Never share the same data between a mutating and a non-mutating consumer since the
// non-mutating consumer may process data async and the mutating consumer may change the data before that.
lastConsumer := msc.mutable[len(msc.mutable)-1]
if len(msc.readonly) == 0 && !md.IsReadOnly() {
errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, md))
} else {
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md))
errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, cloneMetrics(md)))
}
}

Expand All @@ -70,6 +71,12 @@ func (msc *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metri
return errs
}

func cloneMetrics(md pmetric.Metrics) pmetric.Metrics {
clonedMetrics := pmetric.NewMetrics()
md.CopyTo(clonedMetrics)
return clonedMetrics
}

var _ connector.MetricsRouter = (*metricsRouter)(nil)

type metricsRouter struct {
Expand Down
29 changes: 18 additions & 11 deletions internal/fanoutconsumer/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,19 @@ func (tsc *tracesConsumer) Capabilities() consumer.Capabilities {
func (tsc *tracesConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
var errs error

// Clone the data before sending to mutable consumers.
// The only exception is the last consumer which is allowed to mutate the data only if there are no
// other non-mutating consumers and the data is mutable. Never share the same data between
// a mutating and a non-mutating consumer since the non-mutating consumer may process
// data async and the mutating consumer may change the data before that.
for i, tc := range tsc.mutable {
if i < len(tsc.mutable)-1 || td.IsReadOnly() || len(tsc.readonly) > 0 {
clonedTraces := ptrace.NewTraces()
td.CopyTo(clonedTraces)
errs = multierr.Append(errs, tc.ConsumeTraces(ctx, clonedTraces))
if len(tsc.mutable) > 0 {
// Clone the data before sending to all mutating consumers except the last one.
for i := 0; i < len(tsc.mutable)-1; i++ {
errs = multierr.Append(errs, tsc.mutable[i].ConsumeTraces(ctx, cloneTraces(td)))
}
// Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the
// data is mutable. Never share the same data between a mutating and a non-mutating consumer since the
// non-mutating consumer may process data async and the mutating consumer may change the data before that.
lastConsumer := tsc.mutable[len(tsc.mutable)-1]
if len(tsc.readonly) == 0 && !td.IsReadOnly() {
errs = multierr.Append(errs, lastConsumer.ConsumeTraces(ctx, td))
} else {
errs = multierr.Append(errs, tc.ConsumeTraces(ctx, td))
errs = multierr.Append(errs, lastConsumer.ConsumeTraces(ctx, cloneTraces(td)))
}
}

Expand All @@ -70,6 +71,12 @@ func (tsc *tracesConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces)
return errs
}

func cloneTraces(td ptrace.Traces) ptrace.Traces {
clonedTraces := ptrace.NewTraces()
td.CopyTo(clonedTraces)
return clonedTraces
}

var _ connector.TracesRouter = (*tracesRouter)(nil)

type tracesRouter struct {
Expand Down

0 comments on commit 6d9cb0f

Please sign in to comment.