From a57bcc11a377f27d5b984a15f2edd28915c0afa8 Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Thu, 5 Dec 2024 19:38:05 -0500 Subject: [PATCH 1/5] add signal types to result datastructure --- input/elasticapm/processor.go | 5 ++++- input/elasticapm/result.go | 22 +++++++++++++++++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/input/elasticapm/processor.go b/input/elasticapm/processor.go index 63e25e05..f3d2de86 100644 --- a/input/elasticapm/processor.go +++ b/input/elasticapm/processor.go @@ -198,12 +198,16 @@ func (p *Processor) readBatch( err = v2.DecodeNestedError(reader, &input, batch) case metricsetEventType: err = v2.DecodeNestedMetricset(reader, &input, batch) + result.MetricAccepted++ case spanEventType: err = v2.DecodeNestedSpan(reader, &input, batch) + result.SpanAccepted++ case transactionEventType: err = v2.DecodeNestedTransaction(reader, &input, batch) + result.TransactionAccepted++ case logEventType: err = v2.DecodeNestedLog(reader, &input, batch) + result.LogAccepted++ case rumv3ErrorEventType: err = rumv3.DecodeNestedError(reader, &input, batch) case rumv3TransactionEventType: @@ -296,7 +300,6 @@ func (p *Processor) handleStream( if n == 0 { return readErr } - if err := p.processBatch(ctx, processor, batch); err != nil { return fmt.Errorf("cannot process batch: %w", err) } diff --git a/input/elasticapm/result.go b/input/elasticapm/result.go index 89e9df4c..ed037b80 100644 --- a/input/elasticapm/result.go +++ b/input/elasticapm/result.go @@ -23,18 +23,34 @@ import ( type Result struct { errorsSpace [5]error + // Errors holds a limited number of errors that occurred while // processing the event stream. If the limit is reached, the // counters above are still incremented. Errors []error + // Accepted holds the number of valid events accepted. - Accepted int + Accepted int + SpanAccepted int + TransactionAccepted int + MetricAccepted int + LogAccepted int + // TooLarge holds the number of events that were rejected due // to exceeding the event size limit. - TooLarge int + TooLarge int + SpanTooLarge int + TransactionTooLarge int + MetricTooLarge int + LogTooLarge int + // Invalid holds the number of events that were rejected due // to being invalid, excluding those that are counted by TooLarge. - Invalid int + Invalid int + SpanInvalid int + TransactionInvalid int + MetricInvalid int + LogInvalid int } func (r *Result) addError(err error) { From e857f202102d6c9017cc211020bb23c7d9f40d3c Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Fri, 6 Dec 2024 17:50:28 -0500 Subject: [PATCH 2/5] incr signal type after batch processing --- input/elasticapm/processor.go | 23 +++++++++++++---------- input/elasticapm/processor_test.go | 8 ++++++-- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/input/elasticapm/processor.go b/input/elasticapm/processor.go index f3d2de86..377f9fec 100644 --- a/input/elasticapm/processor.go +++ b/input/elasticapm/processor.go @@ -198,16 +198,12 @@ func (p *Processor) readBatch( err = v2.DecodeNestedError(reader, &input, batch) case metricsetEventType: err = v2.DecodeNestedMetricset(reader, &input, batch) - result.MetricAccepted++ case spanEventType: err = v2.DecodeNestedSpan(reader, &input, batch) - result.SpanAccepted++ case transactionEventType: err = v2.DecodeNestedTransaction(reader, &input, batch) - result.TransactionAccepted++ case logEventType: err = v2.DecodeNestedLog(reader, &input, batch) - result.LogAccepted++ case rumv3ErrorEventType: err = rumv3.DecodeNestedError(reader, &input, batch) case rumv3TransactionEventType: @@ -300,18 +296,25 @@ func (p *Processor) handleStream( if n == 0 { return readErr } - if err := p.processBatch(ctx, processor, batch); err != nil { + if err := processor.ProcessBatch(ctx, batch); err != nil { return fmt.Errorf("cannot process batch: %w", err) } + for _, v := range *batch { + switch v.Type() { + case modelpb.SpanEventType: + result.SpanAccepted++ + case modelpb.TransactionEventType: + result.TransactionAccepted++ + case modelpb.MetricEventType: + result.MetricAccepted++ + case modelpb.LogEventType: + result.LogAccepted++ + } + } result.Accepted += n return readErr } -// processBatch processes the batch and returns the events to the pool after it's been processed. -func (p *Processor) processBatch(ctx context.Context, processor modelpb.BatchProcessor, batch *modelpb.Batch) error { - return processor.ProcessBatch(ctx, batch) -} - // getStreamReader returns a streamReader that reads ND-JSON lines from r. func (p *Processor) getStreamReader(r io.Reader) *streamReader { return &streamReader{ diff --git a/input/elasticapm/processor_test.go b/input/elasticapm/processor_test.go index 3dcea907..f1c655d4 100644 --- a/input/elasticapm/processor_test.go +++ b/input/elasticapm/processor_test.go @@ -35,6 +35,7 @@ import ( func TestHandleStreamReaderError(t *testing.T) { readErr := errors.New("read failed") + cnt := 5 var calls int var reader readerFunc = func(p []byte) (int, error) { calls++ @@ -43,7 +44,7 @@ func TestHandleStreamReaderError(t *testing.T) { } buf := bytes.NewBuffer(nil) buf.WriteString(validMetadata + "\n") - for i := 0; i < 5; i++ { + for i := 0; i < cnt; i++ { buf.WriteString(validTransaction + "\n") } return copy(p, buf.Bytes()), nil @@ -60,7 +61,10 @@ func TestHandleStreamReaderError(t *testing.T) { reader, 10, nopBatchProcessor{}, &actualResult, ) assert.ErrorIs(t, err, readErr) - assert.Equal(t, Result{Accepted: 5}, actualResult) + assert.Equal(t, Result{ + Accepted: cnt, + TransactionAccepted: cnt, + }, actualResult) } type readerFunc func([]byte) (int, error) From e82bbbabe8b10d35d4af6d4f4704484d443f7f05 Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Tue, 10 Dec 2024 16:47:34 -0500 Subject: [PATCH 3/5] test: assert that result details arr properly populated --- input/elasticapm/processor.go | 10 +-- input/elasticapm/processor_test.go | 102 +++++++++++++++++++++-------- input/elasticapm/result.go | 28 ++++---- 3 files changed, 94 insertions(+), 46 deletions(-) diff --git a/input/elasticapm/processor.go b/input/elasticapm/processor.go index 377f9fec..877d8388 100644 --- a/input/elasticapm/processor.go +++ b/input/elasticapm/processor.go @@ -301,14 +301,16 @@ func (p *Processor) handleStream( } for _, v := range *batch { switch v.Type() { + case modelpb.ErrorEventType: + result.AcceptedDetails.Error++ case modelpb.SpanEventType: - result.SpanAccepted++ + result.AcceptedDetails.Span++ case modelpb.TransactionEventType: - result.TransactionAccepted++ + result.AcceptedDetails.Transaction++ case modelpb.MetricEventType: - result.MetricAccepted++ + result.AcceptedDetails.Metric++ case modelpb.LogEventType: - result.LogAccepted++ + result.AcceptedDetails.Log++ } } result.Accepted += n diff --git a/input/elasticapm/processor_test.go b/input/elasticapm/processor_test.go index f1c655d4..3f7af016 100644 --- a/input/elasticapm/processor_test.go +++ b/input/elasticapm/processor_test.go @@ -33,6 +33,14 @@ import ( "github.com/elastic/apm-data/model/modelpb" ) +const batchSize = 10 + +type readerFunc func([]byte) (int, error) + +func (f readerFunc) Read(p []byte) (int, error) { + return f(p) +} + func TestHandleStreamReaderError(t *testing.T) { readErr := errors.New("read failed") cnt := 5 @@ -57,22 +65,22 @@ func TestHandleStreamReaderError(t *testing.T) { var actualResult Result err := sp.HandleStream( - context.Background(), &modelpb.APMEvent{}, - reader, 10, nopBatchProcessor{}, &actualResult, + context.Background(), + &modelpb.APMEvent{}, + reader, + batchSize, + nopBatchProcessor{}, + &actualResult, ) assert.ErrorIs(t, err, readErr) assert.Equal(t, Result{ - Accepted: cnt, - TransactionAccepted: cnt, + Accepted: cnt, + AcceptedDetails: ProcessedDetail{ + Transaction: cnt, + }, }, actualResult) } -type readerFunc func([]byte) (int, error) - -func (f readerFunc) Read(p []byte) (int, error) { - return f(p) -} - func TestHandleStreamBatchProcessorError(t *testing.T) { payload := validMetadata + "\n" + validTransaction + "\n" for _, test := range []struct { @@ -89,11 +97,14 @@ func TestHandleStreamBatchProcessorError(t *testing.T) { processor := modelpb.ProcessBatchFunc(func(context.Context, *modelpb.Batch) error { return test.err }) - var actualResult Result err := sp.HandleStream( - context.Background(), &modelpb.APMEvent{}, - strings.NewReader(payload), 10, processor, &actualResult, + context.Background(), + &modelpb.APMEvent{}, + strings.NewReader(payload), + batchSize, + processor, + &actualResult, ) assert.ErrorIs(t, err, test.err) assert.Zero(t, actualResult) @@ -190,9 +201,12 @@ func TestHandleStreamErrors(t *testing.T) { Semaphore: semaphore.NewWeighted(1), }) err := p.HandleStream( - context.Background(), &modelpb.APMEvent{}, - strings.NewReader(test.payload), 10, - nopBatchProcessor{}, &actualResult, + context.Background(), + &modelpb.APMEvent{}, + strings.NewReader(test.payload), + batchSize, + nopBatchProcessor{}, + &actualResult, ) assert.Equal(t, test.err, err) assert.Zero(t, actualResult.Accepted) @@ -224,13 +238,31 @@ func TestHandleStream(t *testing.T) { MaxEventSize: 100 * 1024, Semaphore: semaphore.NewWeighted(1), }) + + var actualResult Result err := p.HandleStream( - context.Background(), &modelpb.APMEvent{}, - strings.NewReader(payload), 10, batchProcessor, - &Result{}, + context.Background(), + &modelpb.APMEvent{}, + strings.NewReader(payload), + batchSize, + batchProcessor, + &actualResult, ) require.NoError(t, err) + // Assert that batch result is properly populated. + assert.Equal(t, Result{ + Accepted: 5, + AcceptedDetails: ProcessedDetail{ + Transaction: 1, + Span: 1, + Metric: 1, + Log: 1, + Error: 1, + }, + }, actualResult) + + // Assert that processor is properly executed. processors := make([]modelpb.APMEventType, len(events)) for i, event := range events { processors[i] = event.Type() @@ -264,8 +296,11 @@ func TestHandleStreamRUMv3(t *testing.T) { }) var result Result err := p.HandleStream( - context.Background(), &modelpb.APMEvent{}, - strings.NewReader(payload), 10, batchProcessor, + context.Background(), + &modelpb.APMEvent{}, + strings.NewReader(payload), + batchSize, + batchProcessor, &result, ) require.NoError(t, err) @@ -315,8 +350,11 @@ func TestHandleStreamBaseEvent(t *testing.T) { Semaphore: semaphore.NewWeighted(1), }) err := p.HandleStream( - context.Background(), &baseEvent, - strings.NewReader(payload), 10, batchProcessor, + context.Background(), + &baseEvent, + strings.NewReader(payload), + batchSize, + batchProcessor, &Result{}, ) require.NoError(t, err) @@ -351,12 +389,18 @@ func TestLabelLeak(t *testing.T) { MaxEventSize: 100 * 1024, Semaphore: semaphore.NewWeighted(1), }) - var actualResult Result - err := p.HandleStream(context.Background(), baseEvent, strings.NewReader(payload), 10, batchProcessor, &actualResult) + err := p.HandleStream( + context.Background(), + baseEvent, + strings.NewReader(payload), + batchSize, + batchProcessor, + &Result{}) require.NoError(t, err) txs := processed assert.Len(t, txs, 2) + // Assert first tx assert.Equal(t, modelpb.NumericLabels{ "time_set": {Value: 1652185276}, @@ -369,8 +413,12 @@ func TestLabelLeak(t *testing.T) { }, modelpb.Labels(txs[0].Labels)) // Assert second tx - assert.Equal(t, modelpb.NumericLabels{"numeric": {Global: true, Value: 1}}, modelpb.NumericLabels(txs[1].NumericLabels)) - assert.Equal(t, modelpb.Labels{"ci_commit": {Global: true, Value: "unknown"}}, modelpb.Labels(txs[1].Labels)) + assert.Equal(t, modelpb.NumericLabels{ + "numeric": {Global: true, Value: 1}, + }, modelpb.NumericLabels(txs[1].NumericLabels)) + assert.Equal(t, modelpb.Labels{ + "ci_commit": {Global: true, Value: "unknown"}, + }, modelpb.Labels(txs[1].Labels)) } type nopBatchProcessor struct{} diff --git a/input/elasticapm/result.go b/input/elasticapm/result.go index ed037b80..9e167958 100644 --- a/input/elasticapm/result.go +++ b/input/elasticapm/result.go @@ -30,27 +30,25 @@ type Result struct { Errors []error // Accepted holds the number of valid events accepted. - Accepted int - SpanAccepted int - TransactionAccepted int - MetricAccepted int - LogAccepted int + Accepted int + AcceptedDetails ProcessedDetail // TooLarge holds the number of events that were rejected due // to exceeding the event size limit. - TooLarge int - SpanTooLarge int - TransactionTooLarge int - MetricTooLarge int - LogTooLarge int + TooLarge int // Invalid holds the number of events that were rejected due // to being invalid, excluding those that are counted by TooLarge. - Invalid int - SpanInvalid int - TransactionInvalid int - MetricInvalid int - LogInvalid int + Invalid int +} + +// ProcessedDetail holds the number of events processed for each type. +type ProcessedDetail struct { + Transaction int + Span int + Metric int + Log int + Error int } func (r *Result) addError(err error) { From d9fbbfccd737673cf6c4812764206e37f2d670db Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Tue, 10 Dec 2024 20:29:58 -0500 Subject: [PATCH 4/5] add godoc comment for all Result fields --- input/elasticapm/result.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/input/elasticapm/result.go b/input/elasticapm/result.go index 9e167958..42995b69 100644 --- a/input/elasticapm/result.go +++ b/input/elasticapm/result.go @@ -29,8 +29,11 @@ type Result struct { // counters above are still incremented. Errors []error - // Accepted holds the number of valid events accepted. - Accepted int + // Accepted holds the total number of valid events accepted. + Accepted int + + // AcceptedDetails provides a detailed breakdown of the count + // of valid events categorized by signal type. AcceptedDetails ProcessedDetail // TooLarge holds the number of events that were rejected due From 754e7f5f07e7ed57daf0127c0b00d4a36f6107e6 Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Tue, 10 Dec 2024 22:10:11 -0500 Subject: [PATCH 5/5] rename ProcessedDetail to AcceptedDetails --- input/elasticapm/processor_test.go | 4 ++-- input/elasticapm/result.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/input/elasticapm/processor_test.go b/input/elasticapm/processor_test.go index 3f7af016..fd343e53 100644 --- a/input/elasticapm/processor_test.go +++ b/input/elasticapm/processor_test.go @@ -75,7 +75,7 @@ func TestHandleStreamReaderError(t *testing.T) { assert.ErrorIs(t, err, readErr) assert.Equal(t, Result{ Accepted: cnt, - AcceptedDetails: ProcessedDetail{ + AcceptedDetails: AcceptedDetails{ Transaction: cnt, }, }, actualResult) @@ -253,7 +253,7 @@ func TestHandleStream(t *testing.T) { // Assert that batch result is properly populated. assert.Equal(t, Result{ Accepted: 5, - AcceptedDetails: ProcessedDetail{ + AcceptedDetails: AcceptedDetails{ Transaction: 1, Span: 1, Metric: 1, diff --git a/input/elasticapm/result.go b/input/elasticapm/result.go index 42995b69..191d2d2d 100644 --- a/input/elasticapm/result.go +++ b/input/elasticapm/result.go @@ -34,7 +34,7 @@ type Result struct { // AcceptedDetails provides a detailed breakdown of the count // of valid events categorized by signal type. - AcceptedDetails ProcessedDetail + AcceptedDetails AcceptedDetails // TooLarge holds the number of events that were rejected due // to exceeding the event size limit. @@ -46,7 +46,7 @@ type Result struct { } // ProcessedDetail holds the number of events processed for each type. -type ProcessedDetail struct { +type AcceptedDetails struct { Transaction int Span int Metric int