Skip to content

Commit

Permalink
Remove usages of deprecated pdata Clone methods (#6170)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax authored Sep 29, 2022
1 parent c8871dc commit 7740c7b
Show file tree
Hide file tree
Showing 12 changed files with 24 additions and 139 deletions.
3 changes: 2 additions & 1 deletion exporter/otlpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,8 @@ func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td pt
assert.EqualValues(t, 0, rcv.requestCount.Load())

// Clone the request and store as expected.
expectedData := td.Clone()
expectedData := ptrace.NewTraces()
td.CopyTo(expectedData)

// Resend the request, this should succeed.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion pdata/pmetric/pmetricotlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func RegisterServer(s *grpc.Server, srv GRPCServer) {
}

type rawMetricsServer struct {
srv Server
srv GRPCServer
}

func (s rawMetricsServer) Export(ctx context.Context, request *otlpcollectormetrics.ExportMetricsServiceRequest) (*otlpcollectormetrics.ExportMetricsServiceResponse, error) {
Expand Down
18 changes: 9 additions & 9 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func TestBatchProcessorSpansDelivered(t *testing.T) {

requestCount := 1000
spansPerRequest := 100
traceDataSlice := make([]ptrace.Traces, 0, requestCount)
sentResourceSpans := ptrace.NewTraces().ResourceSpans()
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex))
}
traceDataSlice = append(traceDataSlice, td.Clone())
td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty())
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
}

Expand All @@ -69,7 +69,7 @@ func TestBatchProcessorSpansDelivered(t *testing.T) {
receivedTraces := sink.AllTraces()
spansReceivedByName := spansReceivedByName(receivedTraces)
for requestNum := 0; requestNum < requestCount; requestNum++ {
spans := traceDataSlice[requestNum].ResourceSpans().At(0).ScopeSpans().At(0).Spans()
spans := sentResourceSpans.At(requestNum).ScopeSpans().At(0).Spans()
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
require.EqualValues(t,
spans.At(spanIndex),
Expand Down Expand Up @@ -327,15 +327,15 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

metricDataSlice := make([]pmetric.Metrics, 0, requestCount)
sentResourceMetrics := pmetric.NewMetrics().ResourceMetrics()

for requestNum := 0; requestNum < requestCount; requestNum++ {
md := testdata.GenerateMetrics(metricsPerRequest)
metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
for metricIndex := 0; metricIndex < metricsPerRequest; metricIndex++ {
metrics.At(metricIndex).SetName(getTestMetricName(requestNum, metricIndex))
}
metricDataSlice = append(metricDataSlice, md.Clone())
md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty())
assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
}

Expand All @@ -349,7 +349,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
receivedMds := sink.AllMetrics()
metricsReceivedByName := metricsReceivedByName(receivedMds)
for requestNum := 0; requestNum < requestCount; requestNum++ {
metrics := metricDataSlice[requestNum].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
metrics := sentResourceMetrics.At(requestNum).ScopeMetrics().At(0).Metrics()
for metricIndex := 0; metricIndex < metricsPerRequest; metricIndex++ {
require.EqualValues(t,
metrics.At(metricIndex),
Expand Down Expand Up @@ -645,15 +645,15 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

logDataSlice := make([]plog.Logs, 0, requestCount)
sentResourceLogs := plog.NewLogs().ResourceLogs()

for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogs(logsPerRequest)
logs := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
for logIndex := 0; logIndex < logsPerRequest; logIndex++ {
logs.At(logIndex).SetSeverityText(getTestLogSeverityText(requestNum, logIndex))
}
logDataSlice = append(logDataSlice, ld.Clone())
ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty())
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
}

Expand All @@ -667,7 +667,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
receivedMds := sink.AllLogs()
logsReceivedBySeverityText := logsReceivedBySeverityText(receivedMds)
for requestNum := 0; requestNum < requestCount; requestNum++ {
logs := logDataSlice[requestNum].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
logs := sentResourceLogs.At(requestNum).ScopeLogs().At(0).LogRecords()
for logIndex := 0; logIndex < logsPerRequest; logIndex++ {
require.EqualValues(t,
logs.At(logIndex),
Expand Down
31 changes: 0 additions & 31 deletions processor/batchprocessor/splitlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,34 +156,3 @@ func TestSplitLogsMultipleILL(t *testing.T) {
assert.Equal(t, "test-log-int-0-0", split.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).SeverityText())
assert.Equal(t, "test-log-int-0-4", split.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(4).SeverityText())
}

func BenchmarkSplitLogs(b *testing.B) {
md := plog.NewLogs()
rms := md.ResourceLogs()
for i := 0; i < 20; i++ {
testdata.GenerateLogs(20).ResourceLogs().MoveAndAppendTo(md.ResourceLogs())
ms := rms.At(rms.Len() - 1).ScopeLogs().At(0).LogRecords()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetSeverityText(getTestLogSeverityText(1, i))
}
}

if b.N > 100000 {
b.Skipf("SKIP: b.N too high, set -benchtime=<n>x with n < 100000")
}

clones := make([]plog.Logs, b.N)
for n := 0; n < b.N; n++ {
clones[n] = md.Clone()
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := clones[n]
split := splitLogs(128, cloneReq)
if split.LogRecordCount() != 128 || cloneReq.LogRecordCount() != 400-128 {
b.Fail()
}
}
}
31 changes: 0 additions & 31 deletions processor/batchprocessor/splitmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,34 +325,3 @@ func TestSplitMetricsMultipleILM(t *testing.T) {
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Name())
}

func BenchmarkSplitMetrics(b *testing.B) {
md := pmetric.NewMetrics()
rms := md.ResourceMetrics()
for i := 0; i < 20; i++ {
testdata.GenerateMetrics(20).ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
ms := rms.At(rms.Len() - 1).ScopeMetrics().At(0).Metrics()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestMetricName(1, i))
}
}

if b.N > 100000 {
b.Skipf("SKIP: b.N too high, set -benchtime=<n>x with n < 100000")
}

dataPointCount := metricDPC(md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0))
clones := make([]pmetric.Metrics, b.N)
for n := 0; n < b.N; n++ {
clones[n] = md.Clone()
}
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := clones[n]
split := splitMetrics(128*dataPointCount, cloneReq)
if split.MetricCount() != 128 || cloneReq.MetricCount() != 400-128 {
b.Fail()
}
}
}
51 changes: 0 additions & 51 deletions processor/batchprocessor/splittraces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,27 +127,6 @@ func TestSplitTracesMultipleResourceSpans_SplitSizeGreaterThanSpanSize(t *testin
assert.Equal(t, "test-span-1-4", split.ResourceSpans().At(1).ScopeSpans().At(0).Spans().At(4).Name())
}

func BenchmarkCloneSpans(b *testing.B) {
td := ptrace.NewTraces()
rms := td.ResourceSpans()
for i := 0; i < 20; i++ {
testdata.GenerateTraces(20).ResourceSpans().MoveAndAppendTo(td.ResourceSpans())
ms := rms.At(rms.Len() - 1).ScopeSpans().At(0).Spans()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestMetricName(1, i))
}
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := td.Clone()
if cloneReq.SpanCount() != 400 {
b.Fail()
}
}
}

func TestSplitTracesMultipleILS(t *testing.T) {
td := testdata.GenerateTraces(20)
spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
Expand Down Expand Up @@ -177,33 +156,3 @@ func TestSplitTracesMultipleILS(t *testing.T) {
assert.Equal(t, "test-span-0-0", split.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name())
assert.Equal(t, "test-span-0-4", split.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(4).Name())
}

func BenchmarkSplitTraces(b *testing.B) {
td := ptrace.NewTraces()
rms := td.ResourceSpans()
for i := 0; i < 20; i++ {
testdata.GenerateTraces(20).ResourceSpans().MoveAndAppendTo(td.ResourceSpans())
ms := rms.At(rms.Len() - 1).ScopeSpans().At(0).Spans()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestMetricName(1, i))
}
}

if b.N > 100000 {
b.Skipf("SKIP: b.N too high, set -benchtime=<n>x with n < 100000")
}

clones := make([]ptrace.Traces, b.N)
for n := 0; n < b.N; n++ {
clones[n] = td.Clone()
}
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := clones[n]
split := splitTraces(128, cloneReq)
if split.SpanCount() != 128 || cloneReq.SpanCount() != 400-128 {
b.Fail()
}
}
}
5 changes: 1 addition & 4 deletions receiver/otlpreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ import (

func TestExport(t *testing.T) {
ld := testdata.GenerateLogs(1)
// Keep log data to compare the test result against it
// Clone needed because OTLP proto XXX_ fields are altered in the GRPC downstream
logData := ld.Clone()
req := plogotlp.NewRequestFromLogs(ld)

logSink := new(consumertest.LogsSink)
Expand All @@ -48,7 +45,7 @@ func TestExport(t *testing.T) {

lds := logSink.AllLogs()
require.Len(t, lds, 1)
assert.EqualValues(t, logData, lds[0])
assert.EqualValues(t, ld, lds[0])
}

func TestExport_EmptyRequest(t *testing.T) {
Expand Down
5 changes: 1 addition & 4 deletions receiver/otlpreceiver/internal/metrics/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ import (

func TestExport(t *testing.T) {
md := testdata.GenerateMetrics(1)
// Keep metric data to compare the test result against it
// Clone needed because OTLP proto XXX_ fields are altered in the GRPC downstream
metricData := md.Clone()
req := pmetricotlp.NewRequestFromMetrics(md)

metricSink := new(consumertest.MetricsSink)
Expand All @@ -49,7 +46,7 @@ func TestExport(t *testing.T) {

mds := metricSink.AllMetrics()
require.Len(t, mds, 1)
assert.EqualValues(t, metricData, mds[0])
assert.EqualValues(t, md, mds[0])
}

func TestExport_EmptyRequest(t *testing.T) {
Expand Down
5 changes: 1 addition & 4 deletions receiver/otlpreceiver/internal/trace/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ import (

func TestExport(t *testing.T) {
td := testdata.GenerateTraces(1)
// Keep trace data to compare the test result against it
// Clone needed because OTLP proto XXX_ fields are altered in the GRPC downstream
traceData := td.Clone()
req := ptraceotlp.NewRequestFromTraces(td)

traceSink := new(consumertest.TracesSink)
Expand All @@ -47,7 +44,7 @@ func TestExport(t *testing.T) {
require.NotNil(t, resp, "The response is missing")

require.Len(t, traceSink.AllTraces(), 1)
assert.EqualValues(t, traceData, traceSink.AllTraces()[0])
assert.EqualValues(t, td, traceSink.AllTraces()[0])
}

func TestExport_EmptyRequest(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion service/internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func (lsc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
// the incoming data to a mutating consumer is used that may change the incoming data before
// cloning.
for _, lc := range lsc.clone {
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, ld.Clone()))
clonedLogs := plog.NewLogs()
ld.CopyTo(clonedLogs)
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, clonedLogs))
}
for _, lc := range lsc.pass {
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, ld))
Expand Down
4 changes: 3 additions & 1 deletion service/internal/fanoutconsumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func (msc *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metri
// the incoming data to a mutating consumer is used that may change the incoming data before
// cloning.
for _, mc := range msc.clone {
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md.Clone()))
clonedMetrics := pmetric.NewMetrics()
md.CopyTo(clonedMetrics)
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, clonedMetrics))
}
for _, mc := range msc.pass {
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md))
Expand Down
4 changes: 3 additions & 1 deletion service/internal/fanoutconsumer/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func (tsc *tracesConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces)
// the incoming data to a mutating consumer is used that may change the incoming data before
// cloning.
for _, tc := range tsc.clone {
errs = multierr.Append(errs, tc.ConsumeTraces(ctx, td.Clone()))
clonedTraces := ptrace.NewTraces()
td.CopyTo(clonedTraces)
errs = multierr.Append(errs, tc.ConsumeTraces(ctx, clonedTraces))
}
for _, tc := range tsc.pass {
errs = multierr.Append(errs, tc.ConsumeTraces(ctx, td))
Expand Down

0 comments on commit 7740c7b

Please sign in to comment.