Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pdata] Enable the pdata mutation safeguards in the fanout consumers #8634

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .chloggen/add-is-read-only.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: pdata

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add IsReadOnly() method to p[metrics|logs|traces].[Metrics|Logs|Spans] pdata structs allowing to check if the struct is read-only.

# One or more tracking issues or pull requests related to the change
issues: [6794]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
26 changes: 26 additions & 0 deletions .chloggen/enable-mutation-assertions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: fanoutconsumer

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable runtime assertions to catch incorrect pdata mutations in the components claiming as non-mutating pdata.

# One or more tracking issues or pull requests related to the change
issues: [6794]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This change enables the runtime assertions to catch unintentional pdata mutations in components that are claimed
as non-mutating pdata. Without these assertions, runtime errors may still occur, but thrown by unrelated components,
making it very difficult to troubleshoot.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
68 changes: 37 additions & 31 deletions internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,22 @@ import (
// NewLogs wraps multiple log consumers in a single one.
// It fanouts the incoming data to all the consumers, and does smart routing:
// - Clones only to the consumer that needs to mutate the data.
// - If all consumers needs to mutate the data one will get the original data.
// - If all consumers needs to mutate the data one will get the original mutable data.
func NewLogs(lcs []consumer.Logs) consumer.Logs {
if len(lcs) == 1 {
// Don't wrap if no need to do it.
return lcs[0]
}
var pass []consumer.Logs
var clone []consumer.Logs
for i := 0; i < len(lcs)-1; i++ {
if !lcs[i].Capabilities().MutatesData {
pass = append(pass, lcs[i])
lc := &logsConsumer{}
for i := 0; i < len(lcs); i++ {
if lcs[i].Capabilities().MutatesData {
lc.mutable = append(lc.mutable, lcs[i])
} else {
clone = append(clone, lcs[i])
lc.readonly = append(lc.readonly, lcs[i])
}
}
// Give the original data to the last consumer if no other read-only consumer,
// otherwise put it in the right bucket. Never share the same data between
// a mutating and a non-mutating consumer since the non-mutating consumer may process
// data async and the mutating consumer may change the data before that.
if len(pass) == 0 || !lcs[len(lcs)-1].Capabilities().MutatesData {
pass = append(pass, lcs[len(lcs)-1])
} else {
clone = append(clone, lcs[len(lcs)-1])
}
return &logsConsumer{pass: pass, clone: clone}
return lc
}

type logsConsumer struct {
pass []consumer.Logs
clone []consumer.Logs
mutable []consumer.Logs
readonly []consumer.Logs
}

func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
Expand All @@ -59,20 +45,40 @@ func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
// ConsumeLogs exports the plog.Logs to all consumers wrapped by the current one.
func (lsc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var errs error
// Initially pass to clone exporter to avoid the case where the optimization of sending
// the incoming data to a mutating consumer is used that may change the incoming data before
// cloning.
for _, lc := range lsc.clone {
clonedLogs := plog.NewLogs()
ld.CopyTo(clonedLogs)
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, clonedLogs))

if len(lsc.mutable) > 0 {
// Clone the data before sending to all mutating consumers except the last one.
for i := 0; i < len(lsc.mutable)-1; i++ {
errs = multierr.Append(errs, lsc.mutable[i].ConsumeLogs(ctx, cloneLogs(ld)))
}
// Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the
// data is mutable. Never share the same data between a mutating and a non-mutating consumer since the
// non-mutating consumer may process data async and the mutating consumer may change the data before that.
lastConsumer := lsc.mutable[len(lsc.mutable)-1]
if len(lsc.readonly) == 0 && !ld.IsReadOnly() {
errs = multierr.Append(errs, lastConsumer.ConsumeLogs(ctx, ld))
} else {
errs = multierr.Append(errs, lastConsumer.ConsumeLogs(ctx, cloneLogs(ld)))
}
}
for _, lc := range lsc.pass {

// Mark the data as read-only if it will be sent to more than one read-only consumer.
if len(lsc.readonly) > 1 && !ld.IsReadOnly() {
ld.MarkReadOnly()
}
for _, lc := range lsc.readonly {
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, ld))
}

return errs
}

func cloneLogs(ld plog.Logs) plog.Logs {
clonedLogs := plog.NewLogs()
ld.CopyTo(clonedLogs)
return clonedLogs
}

var _ connector.LogsRouter = (*logsRouter)(nil)

type logsRouter struct {
Expand Down
55 changes: 49 additions & 6 deletions internal/fanoutconsumer/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
)

func TestLogsNotMultiplexing(t *testing.T) {
nop := consumertest.NewNop()
lfc := NewLogs([]consumer.Logs{nop})
assert.Same(t, nop, lfc)
}

func TestLogsMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.LogsSink)
p2 := new(consumertest.LogsSink)
Expand Down Expand Up @@ -57,6 +51,9 @@ func TestLogsMultiplexingNonMutating(t *testing.T) {
assert.True(t, ld == p3.AllLogs()[1])
assert.EqualValues(t, ld, p3.AllLogs()[0])
assert.EqualValues(t, ld, p3.AllLogs()[1])

// The data should be marked as read only.
assert.True(t, ld.IsReadOnly())
}

func TestLogsMultiplexingMutating(t *testing.T) {
Expand Down Expand Up @@ -91,6 +88,46 @@ func TestLogsMultiplexingMutating(t *testing.T) {
assert.True(t, ld == p3.AllLogs()[1])
assert.EqualValues(t, ld, p3.AllLogs()[0])
assert.EqualValues(t, ld, p3.AllLogs()[1])

// The data should not be marked as read only.
assert.False(t, ld.IsReadOnly())
}

func TestReadOnlyLogsMultiplexingMutating(t *testing.T) {
p1 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}
p2 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}
p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.False(t, lfc.Capabilities().MutatesData)
ldOrig := testdata.GenerateLogs(1)
ld := testdata.GenerateLogs(1)
ld.MarkReadOnly()

for i := 0; i < 2; i++ {
err := lfc.ConsumeLogs(context.Background(), ld)
if err != nil {
t.Errorf("Wanted nil got error")
return
}
}

// All consumers should receive the cloned data.

assert.True(t, ld != p1.AllLogs()[0])
assert.True(t, ld != p1.AllLogs()[1])
assert.EqualValues(t, ldOrig, p1.AllLogs()[0])
assert.EqualValues(t, ldOrig, p1.AllLogs()[1])

assert.True(t, ld != p2.AllLogs()[0])
assert.True(t, ld != p2.AllLogs()[1])
assert.EqualValues(t, ldOrig, p2.AllLogs()[0])
assert.EqualValues(t, ldOrig, p2.AllLogs()[1])

assert.True(t, ld != p3.AllLogs()[0])
assert.True(t, ld != p3.AllLogs()[1])
assert.EqualValues(t, ldOrig, p3.AllLogs()[0])
assert.EqualValues(t, ldOrig, p3.AllLogs()[1])
}

func TestLogsMultiplexingMixLastMutating(t *testing.T) {
Expand Down Expand Up @@ -126,6 +163,9 @@ func TestLogsMultiplexingMixLastMutating(t *testing.T) {
assert.True(t, ld != p3.AllLogs()[1])
assert.EqualValues(t, ld, p3.AllLogs()[0])
assert.EqualValues(t, ld, p3.AllLogs()[1])

// The data should not be marked as read only.
assert.False(t, ld.IsReadOnly())
}

func TestLogsMultiplexingMixLastNonMutating(t *testing.T) {
Expand Down Expand Up @@ -160,6 +200,9 @@ func TestLogsMultiplexingMixLastNonMutating(t *testing.T) {
assert.True(t, ld == p3.AllLogs()[1])
assert.EqualValues(t, ld, p3.AllLogs()[0])
assert.EqualValues(t, ld, p3.AllLogs()[1])

// The data should not be marked as read only.
assert.False(t, ld.IsReadOnly())
}

func TestLogsWhenErrors(t *testing.T) {
Expand Down
68 changes: 37 additions & 31 deletions internal/fanoutconsumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,22 @@ import (
// NewMetrics wraps multiple metrics consumers in a single one.
// It fanouts the incoming data to all the consumers, and does smart routing:
// - Clones only to the consumer that needs to mutate the data.
// - If all consumers needs to mutate the data one will get the original data.
// - If all consumers needs to mutate the data one will get the original mutable data.
func NewMetrics(mcs []consumer.Metrics) consumer.Metrics {
if len(mcs) == 1 {
// Don't wrap if no need to do it.
return mcs[0]
}
var pass []consumer.Metrics
var clone []consumer.Metrics
for i := 0; i < len(mcs)-1; i++ {
if !mcs[i].Capabilities().MutatesData {
pass = append(pass, mcs[i])
mc := &metricsConsumer{}
for i := 0; i < len(mcs); i++ {
if mcs[i].Capabilities().MutatesData {
mc.mutable = append(mc.mutable, mcs[i])
} else {
clone = append(clone, mcs[i])
mc.readonly = append(mc.readonly, mcs[i])
}
}
// Give the original data to the last consumer if no other read-only consumer,
// otherwise put it in the right bucket. Never share the same data between
// a mutating and a non-mutating consumer since the non-mutating consumer may process
// data async and the mutating consumer may change the data before that.
if len(pass) == 0 || !mcs[len(mcs)-1].Capabilities().MutatesData {
pass = append(pass, mcs[len(mcs)-1])
} else {
clone = append(clone, mcs[len(mcs)-1])
}
return &metricsConsumer{pass: pass, clone: clone}
return mc
}

type metricsConsumer struct {
pass []consumer.Metrics
clone []consumer.Metrics
mutable []consumer.Metrics
readonly []consumer.Metrics
}

func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -57,20 +43,40 @@ func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one.
func (msc *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
var errs error
// Initially pass to clone exporter to avoid the case where the optimization of sending
// the incoming data to a mutating consumer is used that may change the incoming data before
// cloning.
for _, mc := range msc.clone {
clonedMetrics := pmetric.NewMetrics()
md.CopyTo(clonedMetrics)
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, clonedMetrics))

if len(msc.mutable) > 0 {
// Clone the data before sending to all mutating consumers except the last one.
for i := 0; i < len(msc.mutable)-1; i++ {
errs = multierr.Append(errs, msc.mutable[i].ConsumeMetrics(ctx, cloneMetrics(md)))
}
// Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the
// data is mutable. Never share the same data between a mutating and a non-mutating consumer since the
// non-mutating consumer may process data async and the mutating consumer may change the data before that.
lastConsumer := msc.mutable[len(msc.mutable)-1]
if len(msc.readonly) == 0 && !md.IsReadOnly() {
errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, md))
} else {
errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, cloneMetrics(md)))
}
}
for _, mc := range msc.pass {

// Mark the data as read-only if it will be sent to more than one read-only consumer.
if len(msc.readonly) > 1 && !md.IsReadOnly() {
md.MarkReadOnly()
}
for _, mc := range msc.readonly {
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md))
}

return errs
}

func cloneMetrics(md pmetric.Metrics) pmetric.Metrics {
clonedMetrics := pmetric.NewMetrics()
md.CopyTo(clonedMetrics)
return clonedMetrics
}

var _ connector.MetricsRouter = (*metricsRouter)(nil)

type metricsRouter struct {
Expand Down
Loading